Fix potential race in route table load
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 45a277a..2deef8e 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -84,6 +84,9 @@ static void* rtc_file( void* vctx ) {
 
                read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
+               if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
+                       return NULL;
+               }
                sleep( 60 );
        }
 }
@@ -102,28 +105,103 @@ static int refresh_vlevel( int vfd ) {
        return vlevel;
 }
 
+/*
+       Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
+       records in each message; it is required that the last record in the message be complete (we do not
+       reconstruct records split over multiple messages).  For each record, we call the record parser
+       to parse and add the information to the table being built.
+
+       This function was broken from the main rtc() function in order to be able to unit test it. Without
+       this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
+       context.
+
+       To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
+       words, this is not thread safe but it shouldn't need to be.
+*/
+static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
+       static  unsigned char* pbuf = NULL;
+       static  int pbuf_size = 0;
+
+       unsigned char* payload;
+       unsigned char* curr;
+       unsigned char* nextr;
+       int mlen;
+
+       payload = msg->payload;
+       mlen = msg->len;                                        // usable bytes in the payload
+
+       if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+       switch( msg->mtype ) {
+               case RMRRM_TABLE_DATA:
+                       if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
+                               *flags |= RTCFL_HAVE_UPDATE;
+                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
+                       }
+
+                       if( pbuf_size <= mlen ) {
+                               if( pbuf ) {
+                                       free( pbuf );
+                               }
+                               if( mlen < 512 ) {
+                                       pbuf_size = 1024;
+                               } else {
+                                       pbuf_size = mlen * 2;
+                               }
+                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+                       }
+                       memcpy( pbuf, payload, mlen );
+                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
+                       if( vlevel > 1 ) {
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
+                       }
+
+                       curr = pbuf;
+                       while( curr ) {                                                                         // loop over each record in the buffer
+                               nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
+
+                               if( nextr ) {
+                                       *(nextr++) = 0;
+                               }
+
+                               if( vlevel > 1 ) {
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                               }
+                               parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
+
+                               curr = nextr;
+                       }
+
+                       msg->len = 0;                           // force back into the listen loop
+                       break;
+
+               default:
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
+                       break;
+       }
+}
+
 /*
        Route Table Collector
        A side thread which either attempts to connect and request a table
        from the Route Manager, or opens a port and listens for Route Manager
        to push table updates.
 
-       It may do other things along the way (latency measurements, alarms, 
+       It may do other things along the way (latency measurements, alarms,
        respond to RMR pings, etc.).
 
        The behaviour with respect to listening for Route Manager updates vs
        the initiation of the connection and sending a request depends on the
        value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
        host:port, or IP:port, is given, then we assume that we make the connection
-       and send a request for the table (request mode).  If the variable is just 
-       a port, then we assume Route Manager will connect and push updates (original 
+       and send a request for the table (request mode).  If the variable is just
+       a port, then we assume Route Manager will connect and push updates (original
        method).
 
        If the variable is not defined, the default behaviour, in order to be
-       backwards compatable, depends on the presence of the ENV_CTL_PORT 
+       backwards compatable, depends on the presence of the ENV_CTL_PORT
        (RMR_CTL_PORT) variable (new with the support for requesting a table).
 
-       
+
        ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
        unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
                                                                        wait for Rt Mgr to push tables
@@ -134,15 +212,15 @@ static int refresh_vlevel( int vfd ) {
                                                                        used is the value set by ENV_CTL_PORT.
 
        unset                   set                             As described above. The default control
-                                                                       port (DEF_CTL_PORT) is used. 
+                                                                       port (DEF_CTL_PORT) is used.
 
-       When we are running in request mode, then we will send the RMR message 
-       RMRRM_REFRESH to this address (wormhole) as a request for the route manager 
+       When we are running in request mode, then we will send the RMR message
+       RMRRM_REFRESH to this address (wormhole) as a request for the route manager
        to send a new table. We will attempt to connect and send requests until
        we have a table. Calls to rmr_ready() will report FALSE until a table is
        loaded _unless_ a seed table was given.
 
-       Route table information is expected to arrive on RMR messages with type 
+       Route table information is expected to arrive on RMR messages with type
        RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
        table record, so the payload is as it appears in the seed file or as
        delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
@@ -163,22 +241,16 @@ static void* rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
        rmr_mbuf_t*     msg = NULL;                             // message from rtg
-       char*   payload;                                        // payload in the message
-       size_t  mlen;
        char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
        char*   rtg_addr;                                       // host:port address of route table generator (route manager)
        char*   daddr;                                          // duplicated rtg address string to parse/trash
        size_t  buf_size;                                       // nng needs var pointer not just size?
-       char*   nextr;                                          // pointer at next record in the message
-       char*   curr;                                           // current record
-       int     i;
+       int             i;
        long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
        int             cstate = -1;                            // connection state to rtg
        int             state;                                          // processing state of some nng function
        char*   tokens[128];
        char    wbuf[128];
-       char*   pbuf = NULL;
-       int             pbuf_size = 0;                          // number allocated in pbuf
        int             ntoks;
        int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
@@ -189,6 +261,8 @@ static void* rtc( void* vctx ) {
        int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
        int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
        int             flags = 0;
+       int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
+       int             nxt_rt_req = 0;                                 // time of next request
 
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
@@ -196,19 +270,33 @@ static void* rtc( void* vctx ) {
                return NULL;
        }
 
+       if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
+               return NULL;
+       }
+
        if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
                vfd = open( eptr, O_RDONLY );
                vlevel = refresh_vlevel( vfd );
        }
 
-       ctx->flags |= CFL_NO_RTACK;                     // don't ack when reading from a file
+       if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
+               rt_req_freq = atoi( eptr );
+               if( rt_req_freq < 1 || rt_req_freq > 300 ) {
+                       rt_req_freq = DEF_RTREQ_FREQ;
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
+               }
+       }
+       rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
+
+       ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
        read_static_rt( ctx, vlevel );                  // seed the route table if one provided
        ctx->flags &= ~CFL_NO_RTACK;
 
 
        my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
        if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
-               my_port = DEF_CTL_PORT; 
+               my_port = DEF_CTL_PORT;
                daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
        } else {
                daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
@@ -235,7 +323,7 @@ static void* rtc( void* vctx ) {
                                flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
                                my_port = tokens[1];
                        } else {
-                               // rtg_addr points at rt mgr address and my port set from env or default stands as is   
+                               // rtg_addr points at rt mgr address and my port set from env or default stands as is
                        }
                        break;
        }
@@ -259,11 +347,12 @@ static void* rtc( void* vctx ) {
        blabber = 0;
        while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
                while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
-                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {                                                // no route table updated from rt mgr; request one
+                       if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
                                if( ctx->rtg_whid < 0 ) {
                                        ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
                                }
                                send_update_req( pvt_cx, ctx );
+                               nxt_rt_req = time( NULL ) + rt_req_freq;
                        }
 
                        msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
@@ -278,63 +367,16 @@ static void* rtc( void* vctx ) {
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
+
+                       if( ctx->shutdown != 0 ) {
+                               break;                                                  // mostly for unit test, but allows a forced stop
+                       }
                }
 
                vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
 
                if( msg != NULL && msg->len > 0 ) {
-                       payload = msg->payload;
-                       mlen = msg->len;                                        // usable bytes in the payload
-                       if( vlevel > 1 ) {
-                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
-                       } else {
-                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
-                       }
-
-                       switch( msg->mtype ) {
-                               case RMRRM_TABLE_DATA:
-                                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
-                                               flags |= RTCFL_HAVE_UPDATE;
-                                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
-                                       }
-
-                                       if( pbuf_size <= mlen ) {
-                                               if( pbuf ) {
-                                                       free( pbuf );
-                                               }
-                                               if( mlen < 512 ) {
-                                                       pbuf_size = 512;
-                                               } else {
-                                                       pbuf_size = mlen * 2;
-                                               }
-                                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
-                                       }
-                                       memcpy( pbuf, payload, mlen );
-                                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
-
-                                       curr = pbuf;
-                                       while( curr ) {                                                         // loop over each record in the buffer
-                                               nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
-
-                                               if( nextr ) {
-                                                       *(nextr++) = 0;
-                                               }
-
-                                               if( vlevel > 1 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
-                                               }
-                                               parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
-
-                                               curr = nextr;
-                                       }
-
-                                       msg->len = 0;                           // force back into the listen loop
-                                       break;
-
-                               default:
-                                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
-                                       break;
-                       }
+                       rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
                }
 
                if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
@@ -347,7 +389,7 @@ static void* rtc( void* vctx ) {
 }
 
 #ifndef SI95_BUILD
-// this is nng specific inas much as we allow raw (non-RMR) messages 
+// this is nng specific inas much as we allow raw (non-RMR) messages
 
 /*
        NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
@@ -405,7 +447,7 @@ static void* raw_rtc( void* vctx ) {
        size_t  buf_size;                                       // nng needs var pointer not just size?
        char*   nextr;                                          // pointer at next record in the message
        char*   curr;                                           // current record
-       int     i;
+       int             i;
        long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
        int             cstate = -1;                            // connection state to rtg
        int             state;                                          // processing state of some nng function
@@ -434,7 +476,7 @@ static void* raw_rtc( void* vctx ) {
        if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
                vfd = open( eptr, O_RDONLY );
                vlevel = refresh_vlevel( vfd );
-       }                
+       }
 
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
@@ -475,7 +517,7 @@ static void* raw_rtc( void* vctx ) {
                        sleep( count_delay );
                        rt_epcounts( ctx->rtable, ctx->my_name );
                }
-               
+
                free( fport );                                  // parinoid free and return
                return NULL;
        }
@@ -570,9 +612,9 @@ static void* raw_rtc( void* vctx ) {
                                        rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
                                }
                                if( raw_interface ) {
-                                       parse_rt_rec( ctx, NULL, curr, vlevel );                // nil pvt to parser as we can't ack messages
+                                       parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
                                } else {
-                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
+                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
                                }
 
                                curr = nextr;