// : vi ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
// ------------------------------------------------------------------------------------------------
+/*
+ Opens the vlevel control file if needed and reads the vlevel from it.
+ The file is rewound if already open so that external updates are captured.
+ The current level is returnd; 0 on error.
+
+ The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
+ open and read. If missing, we will try /tmp/rmr.v. We will try to open the file
+ on each call if not alrady open; this allows the value to be supplied after
+ start which helps debugging.
+
+ If close_file is true, then we will close the open vfd and return 0;
+*/
+extern int refresh_vlevel( int close_file ) {
+ static int vfd = -1;
+
+ char* eptr;
+ char wbuf[128]; // read buffer; MUST be 11 or greater
+ int vlevel = 0;
+
+ if( close_file ) {
+ if( vfd >= 0 ) {
+ close( vfd );
+ vfd = -1;
+ }
+ return 0;
+ }
+
+ if( vfd < 0 ) { // attempt to find/open on all calls if not open
+ if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+ vfd = open( eptr, O_RDONLY );
+ } else {
+ vfd = open( "/tmp/rmr.v", O_RDONLY );
+ }
+ if( vfd < 0 ) {
+ return 0;
+ }
+ }
+
+ memset( wbuf, 0, sizeof( char ) * 11 ); // ensure what we read will be nil terminated
+ if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
+ vlevel = atoi( wbuf );
+ }
+
+ return vlevel;
+}
+
/*
Loop forever (assuming we're running in a pthread reading the static table
every minute or so.
static void* rtc_file( void* vctx ) {
uta_ctx_t* ctx; // context user has -- where we pin the route table
char* eptr;
- int vfd = -1; // verbose file des if we have one
int vlevel = 0; // how chatty we should be 0== no nattering allowed
char wbuf[256];
return NULL;
}
- if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
- vfd = open( eptr, O_RDONLY );
- }
-
ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
while( 1 ) {
- if( vfd >= 0 ) {
- wbuf[0] = 0;
- lseek( vfd, 0, 0 );
- read( vfd, wbuf, 10 );
- vlevel = atoi( wbuf );
- }
+ vlevel = refresh_vlevel( 0 );
+ read_static_rt( ctx, vlevel ); // refresh from the file
- read_static_rt( ctx, vlevel ); // seed the route table if one provided
+ if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
+ refresh_vlevel( 1 ); // close the verbose file if open
+ return NULL;
+ }
- sleep( 60 );
+ if( ctx->rtable_ready ) {
+ sleep( 60 );
+ } else {
+ sleep( 1 ); // check every second until we have a good one
+ }
}
}
-static int refresh_vlevel( int vfd ) {
- int vlevel = 0;
- char rbuf[128];
+/*
+ Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
+ records in each message; it is required that the last record in the message be complete (we do not
+ reconstruct records split over multiple messages). For each record, we call the record parser
+ to parse and add the information to the table being built.
- if( vfd >= 0 ) { // if file is open, read current value
- rbuf[0] = 0;
- lseek( vfd, 0, 0 );
- read( vfd, rbuf, 10 );
- vlevel = atoi( rbuf );
- }
+ This function was broken from the main rtc() function in order to be able to unit test it. Without
+ this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
+ context.
- return vlevel;
+ To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
+ words, this is not thread safe but it shouldn't need to be.
+*/
+static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
+ static unsigned char* pbuf = NULL;
+ static int pbuf_size = 0;
+
+ unsigned char* payload;
+ unsigned char* curr;
+ unsigned char* nextr;
+ int mlen;
+
+ payload = msg->payload;
+ mlen = msg->len; // usable bytes in the payload
+
+ if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+ switch( msg->mtype ) {
+ case RMRRM_TABLE_DATA:
+ if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
+ *flags |= RTCFL_HAVE_UPDATE;
+ rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
+ }
+
+ if( pbuf_size <= mlen ) {
+ if( pbuf ) {
+ free( pbuf );
+ }
+ if( mlen < 512 ) {
+ pbuf_size = 1024;
+ } else {
+ pbuf_size = mlen * 2;
+ }
+ pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+ }
+ memcpy( pbuf, payload, mlen );
+ pbuf[mlen] = 0; // don't depend on sender making this a legit string
+ if( vlevel > 1 ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
+ }
+
+ curr = pbuf;
+ while( curr ) { // loop over each record in the buffer
+ nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
+
+ if( nextr ) {
+ *(nextr++) = 0;
+ }
+
+ if( vlevel > 1 ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
+ }
+ parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
+
+ curr = nextr;
+ }
+
+ msg->len = 0; // force back into the listen loop
+ break;
+
+ default:
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
+ break;
+ }
}
/*
from the Route Manager, or opens a port and listens for Route Manager
to push table updates.
- It may do other things along the way (latency measurements, alarms,
+ It may do other things along the way (latency measurements, alarms,
respond to RMR pings, etc.).
The behaviour with respect to listening for Route Manager updates vs
the initiation of the connection and sending a request depends on the
value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
host:port, or IP:port, is given, then we assume that we make the connection
- and send a request for the table (request mode). If the variable is just
- a port, then we assume Route Manager will connect and push updates (original
+ and send a request for the table (request mode). If the variable is just
+ a port, then we assume Route Manager will connect and push updates (original
method).
If the variable is not defined, the default behaviour, in order to be
- backwards compatable, depends on the presence of the ENV_CTL_PORT
+ backwards compatable, depends on the presence of the ENV_CTL_PORT
(RMR_CTL_PORT) variable (new with the support for requesting a table).
-
+
ENV_CTL_PORT ENV_RTG_ADDR Behaviour
unset unset Open default CTL port (DEF_CTL_PORT) and
wait for Rt Mgr to push tables
used is the value set by ENV_CTL_PORT.
unset set As described above. The default control
- port (DEF_CTL_PORT) is used.
+ port (DEF_CTL_PORT) is used.
- When we are running in request mode, then we will send the RMR message
- RMRRM_REFRESH to this address (wormhole) as a request for the route manager
+ When we are running in request mode, then we will send the RMR message
+ RMRRM_REFRESH to this address (wormhole) as a request for the route manager
to send a new table. We will attempt to connect and send requests until
we have a table. Calls to rmr_ready() will report FALSE until a table is
loaded _unless_ a seed table was given.
- Route table information is expected to arrive on RMR messages with type
+ Route table information is expected to arrive on RMR messages with type
RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
table record, so the payload is as it appears in the seed file or as
delivered in old versions. It may take several RMRRM_TABLE_DATA messages
uta_ctx_t* ctx; // context user has -- where we pin the route table
uta_ctx_t* pvt_cx; // private context for session with rtg
rmr_mbuf_t* msg = NULL; // message from rtg
- char* payload; // payload in the message
- size_t mlen;
+ route_table_t* rt; // the routing table that will be traversed to print statistics
char* my_port; // the port number that we will listen on (4561 has been the default for this)
char* rtg_addr; // host:port address of route table generator (route manager)
char* daddr; // duplicated rtg address string to parse/trash
size_t buf_size; // nng needs var pointer not just size?
- char* nextr; // pointer at next record in the message
- char* curr; // current record
- int i;
+ int i;
long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
int cstate = -1; // connection state to rtg
int state; // processing state of some nng function
char* tokens[128];
char wbuf[128];
- char* pbuf = NULL;
- int pbuf_size = 0; // number allocated in pbuf
int ntoks;
- int vfd = -1; // verbose file des if we have one
int vlevel = 0; // how chatty we should be 0== no nattering allowed
char* eptr;
int epfd = -1; // fd for epoll so we can multi-task
int count_delay = 30; // number of seconds between writing count info; initially every 30s
int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
int flags = 0;
+ int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
+ int nxt_rt_req = 0; // time of next request
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return NULL;
}
- if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
- vfd = open( eptr, O_RDONLY );
- vlevel = refresh_vlevel( vfd );
+ vlevel = refresh_vlevel( 0 );
+
+ if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
+ rt_req_freq = atoi( eptr );
+ if( rt_req_freq < 1 || rt_req_freq > 300 ) {
+ rt_req_freq = DEF_RTREQ_FREQ;
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
+ }
}
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
- ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
+ ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
read_static_rt( ctx, vlevel ); // seed the route table if one provided
ctx->flags &= ~CFL_NO_RTACK;
+ ctx->flags &= ~CFL_FULLRT; // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
- my_port = DEF_CTL_PORT;
+ my_port = DEF_CTL_PORT;
daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
} else {
daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
my_port = tokens[1];
} else {
- // rtg_addr points at rt mgr address and my port set from env or default stands as is
+ // rtg_addr points at rt mgr address and my port set from env or default stands as is
}
break;
}
ctx->rtg_whid = -1;
+ cycle_snarfed_rt( ctx ); // cause the nrt to be opened
+
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
blabber = 0;
while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
- if( (flags & RTCFL_HAVE_UPDATE) == 0 ) { // no route table updated from rt mgr; request one
+ if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
if( ctx->rtg_whid < 0 ) {
ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
}
send_update_req( pvt_cx, ctx );
+ nxt_rt_req = time( NULL ) + rt_req_freq;
}
msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
if( time( NULL ) > blabber ) {
- vlevel = refresh_vlevel( vfd );
+ vlevel = refresh_vlevel( 0 );
+ blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
+ if( blabber > bump_freq ) {
+ count_delay = 300;
+ }
if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
- blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
- if( blabber > bump_freq ) {
- count_delay = 300;
- }
- rt_epcounts( ctx->rtable, ctx->my_name );
+ rt = get_rt( ctx ); // get active route table and up ref count
+ rt_epcounts( rt, ctx->my_name );
+ release_rt( ctx, rt ); // dec safely the ref counter
}
}
- }
-
- vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
- if( msg != NULL && msg->len > 0 ) {
- payload = msg->payload;
- mlen = msg->len; // usable bytes in the payload
- if( vlevel > 1 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
- } else {
- if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+ if( ctx->shutdown != 0 ) {
+ break; // mostly for unit test, but allows a forced stop
}
- switch( msg->mtype ) {
- case RMRRM_TABLE_DATA:
- if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
- flags |= RTCFL_HAVE_UPDATE;
- rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
- }
-
- if( pbuf_size <= mlen ) {
- if( pbuf ) {
- free( pbuf );
- }
- if( mlen < 512 ) {
- pbuf_size = 512;
- } else {
- pbuf_size = mlen * 2;
- }
- pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
- }
- memcpy( pbuf, payload, mlen );
- pbuf[mlen] = 0; // don't depend on sender making this a legit string
-
- curr = pbuf;
- while( curr ) { // loop over each record in the buffer
- nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
-
- if( nextr ) {
- *(nextr++) = 0;
- }
-
- if( vlevel > 1 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
- }
- parse_rt_rec( ctx, pvt_cx, curr, vlevel ); // parse record and add to in progress table
-
- curr = nextr;
- }
+ // extra housekeeping chores can be added here...
+ alarm_if_drops( ctx, pvt_cx ); // send an alarm if message are dropping, clear if we set one and thtings are better
+ }
- msg->len = 0; // force back into the listen loop
- break;
+ vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
- default:
- rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
- break;
- }
+ if( msg != NULL && msg->len > 0 ) {
+ rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
}
if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
}
#ifndef SI95_BUILD
-// this is nng specific inas much as we allow raw (non-RMR) messages
+// this is nng specific inas much as we allow raw (non-RMR) messages
/*
NOTE: This is the original rtc code when we supported "raw" nano/nng messages
uta_ctx_t* ctx; // context user has -- where we pin the route table
uta_ctx_t* pvt_cx; // private context for session with rtg
rmr_mbuf_t* msg = NULL; // message from rtg
+ route_table_t* rt; // the routing table that will be traversed to print statistics
char* payload; // payload in the message
size_t mlen;
char* port; // a port number we listen/connect to
size_t buf_size; // nng needs var pointer not just size?
char* nextr; // pointer at next record in the message
char* curr; // current record
- int i;
+ int i;
long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
int cstate = -1; // connection state to rtg
int state; // processing state of some nng function
int pbuf_size = 0; // number allocated in pbuf
int ntoks;
int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
- int vfd = -1; // verbose file des if we have one
int vlevel = 0; // how chatty we should be 0== no nattering allowed
char* eptr;
int epfd = -1; // fd for epoll so we can multi-task
return NULL;
}
- if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
- vfd = open( eptr, O_RDONLY );
- vlevel = refresh_vlevel( vfd );
- }
-
+ vlevel = refresh_vlevel( 0 );
read_static_rt( ctx, vlevel ); // seed the route table if one provided
if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
port = strdup( port );
}
-/*
- this test is now done in init and this function is started _only_ if the value was 1
- if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
- raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
- }
-*/
-
fport = port; // must hold to free
ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
sleep( count_delay );
rt_epcounts( ctx->rtable, ctx->my_name );
}
-
+
free( fport ); // parinoid free and return
return NULL;
}
}
if( time( NULL ) > blabber ) {
- vlevel = refresh_vlevel( vfd );
+ vlevel = refresh_vlevel( 0 );
if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
if( blabber > bump_freq ) {
count_delay = 300;
}
- rt_epcounts( ctx->rtable, ctx->my_name );
+ rt = get_rt( ctx ); // get active route table and up ref count
+ rt_epcounts( rt, ctx->my_name );
+ release_rt( ctx, rt ); // dec safely the ref counter
}
}
+
+ alarm_if_drops( ctx ); // send an alarm if message are dropping, clear if we set one and thtings are better
}
- vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
+ vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
if( msg != NULL && msg->len > 0 ) {
payload = msg->payload;
rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
}
if( raw_interface ) {
- parse_rt_rec( ctx, NULL, curr, vlevel ); // nil pvt to parser as we can't ack messages
+ parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
} else {
- parse_rt_rec( ctx, pvt_cx, curr, vlevel ); // parse record and add to in progress table
+ parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
}
curr = nextr;