Add ability to track send counts for an endpoint
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 013dc8d..cce8ddc 100644 (file)
        records, but each buffer must be less than 4K in length, and the last record in a
        buffere may NOT be split across buffers.
 
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
 */
 static void* rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
@@ -99,6 +104,13 @@ static void* rtc( void* vctx ) {
        int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
@@ -113,7 +125,7 @@ static void* rtc( void* vctx ) {
                        read( vfd, wbuf, 10 );
                        vlevel = atoi( wbuf );
                }
-       }
+       }                
 
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
@@ -146,24 +158,67 @@ static void* rtc( void* vctx ) {
 
        if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
                fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
-               free( fport );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+               
+               free( fport );                                  // parinoid free and return
                return NULL;
        }
 
+       if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+               } else {
+                       if( (epfd = epoll_create1( 0 )) < 0 ) {
+                               fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+                               rcv_fd = -1;
+                       } else {
+                               epe.events = EPOLLIN;
+                               epe.data.fd = rcv_fd;
+
+                               if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                                       fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+                                       rcv_fd = -1;
+                               }
+                       }
+               }
+       }
+
        if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
        free( fport );
 
        // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
 
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
        blabber = 0;
        while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
-               if( raw_interface ) {
-                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
-               } else {
-                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
+                               if( raw_interface ) {
+                                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
+                               } else {
+                                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
+                               }
+                       } else {                                                                                                        // no msg, do extra tasks
+                               if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
+                                       msg->len = 0;
+                                       msg->state = RMR_ERR_TIMEOUT;
+                               }
+                       }
+
+                       if( time( NULL ) > blabber  ) {
+                               blabber = time( NULL ) + count_delay;                                   // set next time to blabber, then do so
+                               if( blabber > bump_freq ) {
+                                       count_delay = 300;
+                               }
+                               rt_epcounts( ctx->rtable, ctx->my_name );
+                       }
                }
 
-               if( vfd >= 0 ) {                                                        // if changed since last go round
+               if( vfd >= 0 ) {                                                        // if file open, check for change to vlevel
                        wbuf[0] = 0;
                        lseek( vfd, 0, 0 );
                        read( vfd, wbuf, 10 );
@@ -212,11 +267,8 @@ static void* rtc( void* vctx ) {
                        if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
                                break;
                        }
-               } else {
-                       if( time( NULL ) > blabber  ) {
-                               fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
-                               blabber = time( NULL ) + 180;                   // limit to 1 every 3 min or so
-                       }
+
+                       msg->len = 0;                           // force back into the listen loop
                }
        }