Allow endpoint selection based on meid in message
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 013dc8d..58fc940 100644 (file)
 #include <sys/stat.h>
 #include <unistd.h>
 
+static int refresh_vlevel( int vfd ) {
+       int vlevel = 0;
+       char    rbuf[128];
+
+       if( vfd >= 0 ) {                                        // if file is open, read current value
+               rbuf[0] = 0;
+               lseek( vfd, 0, 0 );
+               read( vfd, rbuf, 10 );
+               vlevel = atoi( rbuf );
+       }
+
+       return vlevel;
+}
+
 /*
        Route Table Collector
        A side thread which opens a socket and subscribes to a routing table generator.
 
        Buffers received from the route table generator can contain multiple newline terminated
        records, but each buffer must be less than 4K in length, and the last record in a
-       buffere may NOT be split across buffers.
+       buffer may NOT be split across buffers.
 
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
 */
 static void* rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
@@ -99,6 +118,13 @@ static void* rtc( void* vctx ) {
        int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
@@ -107,13 +133,8 @@ static void* rtc( void* vctx ) {
 
        if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
                vfd = open( eptr, O_RDONLY );
-               if( vfd >= 0 ) {
-                       wbuf[0] = 0;
-                       lseek( vfd, 0, 0 );
-                       read( vfd, wbuf, 10 );
-                       vlevel = atoi( wbuf );
-               }
-       }
+               vlevel = refresh_vlevel( vfd );
+       }                
 
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
@@ -146,30 +167,71 @@ static void* rtc( void* vctx ) {
 
        if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
                fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
-               free( fport );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+               
+               free( fport );                                  // parinoid free and return
                return NULL;
        }
 
+       if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+               } else {
+                       if( (epfd = epoll_create1( 0 )) < 0 ) {
+                               fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+                               rcv_fd = -1;
+                       } else {
+                               epe.events = EPOLLIN;
+                               epe.data.fd = rcv_fd;
+
+                               if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                                       fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+                                       rcv_fd = -1;
+                               }
+                       }
+               }
+       }
+
        if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
        free( fport );
 
        // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
 
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
        blabber = 0;
        while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
-               if( raw_interface ) {
-                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
-               } else {
-                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
-               }
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
+                               if( raw_interface ) {
+                                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
+                               } else {
+                                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
+                               }
+                       } else {                                                                                                        // no msg, do extra tasks
+                               if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
+                                       msg->len = 0;
+                                       msg->state = RMR_ERR_TIMEOUT;
+                               }
+                       }
 
-               if( vfd >= 0 ) {                                                        // if changed since last go round
-                       wbuf[0] = 0;
-                       lseek( vfd, 0, 0 );
-                       read( vfd, wbuf, 10 );
-                       vlevel = atoi( wbuf );
+                       if( time( NULL ) > blabber  ) {
+                               vlevel = refresh_vlevel( vfd );
+                               if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
+                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                                       if( blabber > bump_freq ) {
+                                               count_delay = 300;
+                                       }
+                                       rt_epcounts( ctx->rtable, ctx->my_name );
+                               }
+                       }
                }
 
+               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+
                if( msg != NULL && msg->len > 0 ) {
                        payload = msg->payload;
                        mlen = msg->len;                                        // usable bytes in the payload
@@ -212,11 +274,8 @@ static void* rtc( void* vctx ) {
                        if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
                                break;
                        }
-               } else {
-                       if( time( NULL ) > blabber  ) {
-                               fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
-                               blabber = time( NULL ) + 180;                   // limit to 1 every 3 min or so
-                       }
+
+                       msg->len = 0;                           // force back into the listen loop
                }
        }