X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frtc_static.c;h=58fc9408a8bada42a1965ebdd7ec4edba9cdbc61;hb=0b79fc264eea2591ad6f645d0c90cc378ea5603b;hp=013dc8d2f85ea43198fab7fc5b57db8471ebe24c;hpb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rtc_static.c b/src/rmr/common/src/rtc_static.c index 013dc8d..58fc940 100644 --- a/src/rmr/common/src/rtc_static.c +++ b/src/rmr/common/src/rtc_static.c @@ -42,6 +42,20 @@ #include #include +static int refresh_vlevel( int vfd ) { + int vlevel = 0; + char rbuf[128]; + + if( vfd >= 0 ) { // if file is open, read current value + rbuf[0] = 0; + lseek( vfd, 0, 0 ); + read( vfd, rbuf, 10 ); + vlevel = atoi( rbuf ); + } + + return vlevel; +} + /* Route Table Collector A side thread which opens a socket and subscribes to a routing table generator. @@ -71,8 +85,13 @@ Buffers received from the route table generator can contain multiple newline terminated records, but each buffer must be less than 4K in length, and the last record in a - buffere may NOT be split across buffers. + buffer may NOT be split across buffers. + Other chores: + In addition to the primary task of getting, vetting, and installing a new route table, or + updates to the existing table, this thread will periodically cause the send counts for each + endpoint known to be written to standard error. The frequency is once every 180 seconds, and + more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0. */ static void* rtc( void* vctx ) { uta_ctx_t* ctx; // context user has -- where we pin the route table @@ -99,6 +118,13 @@ static void* rtc( void* vctx ) { int vfd = -1; // verbose file des if we have one int vlevel = 0; // how chatty we should be 0== no nattering allowed char* eptr; + int epfd = -1; // fd for epoll so we can multi-task + struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about + struct epoll_event epe; // event definition for event to listen to + int rcv_fd = -1; // pollable file des from NNG to use for timeout + int count_delay = 30; // number of seconds between writing count info; initially every 30s + int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" ); @@ -107,13 +133,8 @@ static void* rtc( void* vctx ) { if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) { vfd = open( eptr, O_RDONLY ); - if( vfd >= 0 ) { - wbuf[0] = 0; - lseek( vfd, 0, 0 ); - read( vfd, wbuf, 10 ); - vlevel = atoi( wbuf ); - } - } + vlevel = refresh_vlevel( vfd ); + } read_static_rt( ctx, vlevel ); // seed the route table if one provided @@ -146,30 +167,71 @@ static void* rtc( void* vctx ) { if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" ); - free( fport ); + + while( TRUE ) { // no listen port, just dump counts now and then + sleep( count_delay ); + rt_epcounts( ctx->rtable, ctx->my_name ); + } + + free( fport ); // parinoid free and return return NULL; } + if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket + if( rcv_fd < 0 ) { + fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" ); + } else { + if( (epfd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) ); + rcv_fd = -1; + } else { + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) ); + rcv_fd = -1; + } + } + } + } + if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port ); free( fport ); // future: if we need to register with the rtg, then build a message and send it through a wormhole here + bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency blabber = 0; while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen - if( raw_interface ) { - msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender - } else { - msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender - } + while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side + if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec + if( raw_interface ) { + msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender + } else { + msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender + } + } else { // no msg, do extra tasks + if( msg != NULL ) { // if we were working with a message; ensure no len + msg->len = 0; + msg->state = RMR_ERR_TIMEOUT; + } + } - if( vfd >= 0 ) { // if changed since last go round - wbuf[0] = 0; - lseek( vfd, 0, 0 ); - read( vfd, wbuf, 10 ); - vlevel = atoi( wbuf ); + if( time( NULL ) > blabber ) { + vlevel = refresh_vlevel( vfd ); + if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file + blabber = time( NULL ) + count_delay; // set next time to blabber, then do so + if( blabber > bump_freq ) { + count_delay = 300; + } + rt_epcounts( ctx->rtable, ctx->my_name ); + } + } } + vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message + if( msg != NULL && msg->len > 0 ) { payload = msg->payload; mlen = msg->len; // usable bytes in the payload @@ -212,11 +274,8 @@ static void* rtc( void* vctx ) { if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this break; } - } else { - if( time( NULL ) > blabber ) { - fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" ); - blabber = time( NULL ) + 180; // limit to 1 every 3 min or so - } + + msg->len = 0; // force back into the listen loop } }