records, but each buffer must be less than 4K in length, and the last record in a
buffere may NOT be split across buffers.
+ Other chores:
+ In addition to the primary task of getting, vetting, and installing a new route table, or
+ updates to the existing table, this thread will periodically cause the send counts for each
+ endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+ more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
*/
static void* rtc( void* vctx ) {
uta_ctx_t* ctx; // context user has -- where we pin the route table
int vfd = -1; // verbose file des if we have one
int vlevel = 0; // how chatty we should be 0== no nattering allowed
char* eptr;
+ int epfd = -1; // fd for epoll so we can multi-task
+ struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
+ struct epoll_event epe; // event definition for event to listen to
+ int rcv_fd = -1; // pollable file des from NNG to use for timeout
+ int count_delay = 30; // number of seconds between writing count info; initially every 30s
+ int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
read( vfd, wbuf, 10 );
vlevel = atoi( wbuf );
}
- }
+ }
read_static_rt( ctx, vlevel ); // seed the route table if one provided
if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
- free( fport );
+
+ while( TRUE ) { // no listen port, just dump counts now and then
+ sleep( count_delay );
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
+
+ free( fport ); // parinoid free and return
return NULL;
}
+ if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
+ if( rcv_fd < 0 ) {
+ fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+ } else {
+ if( (epfd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+ rcv_fd = -1;
+ } else {
+ epe.events = EPOLLIN;
+ epe.data.fd = rcv_fd;
+
+ if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
+ fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+ rcv_fd = -1;
+ }
+ }
+ }
+ }
+
if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
free( fport );
// future: if we need to register with the rtg, then build a message and send it through a wormhole here
+ bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
blabber = 0;
while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
- if( raw_interface ) {
- msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
- } else {
- msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
+ while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
+ if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
+ if( raw_interface ) {
+ msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
+ } else {
+ msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
+ }
+ } else { // no msg, do extra tasks
+ if( msg != NULL ) { // if we were working with a message; ensure no len
+ msg->len = 0;
+ msg->state = RMR_ERR_TIMEOUT;
+ }
+ }
+
+ if( time( NULL ) > blabber ) {
+ blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
+ if( blabber > bump_freq ) {
+ count_delay = 300;
+ }
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
}
- if( vfd >= 0 ) { // if changed since last go round
+ if( vfd >= 0 ) { // if file open, check for change to vlevel
wbuf[0] = 0;
lseek( vfd, 0, 0 );
read( vfd, wbuf, 10 );
if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
break;
}
- } else {
- if( time( NULL ) > blabber ) {
- fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
- blabber = time( NULL ) + 180; // limit to 1 every 3 min or so
- }
+
+ msg->len = 0; // force back into the listen loop
}
}