1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 static int refresh_vlevel( int vfd ) {
49 if( vfd >= 0 ) { // if file is open, read current value
52 read( vfd, rbuf, 10 );
53 vlevel = atoi( rbuf );
61 A side thread which opens a socket and subscribes to a routing table generator.
62 It may do other things along the way (latency measurements?).
64 The pointer is a pointer to the context.
66 Listens for records from the route table generation publisher, expecting
67 one of the following, newline terminated, ASCII records:
68 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
69 new|start // start of new table
70 new|end // end of new table; complete
72 Name must be a host name which can be looked up via gethostbyname() (DNS).
74 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
75 for each message of the type that is sent.
77 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
78 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
80 If multiple groups are given, when send() is called for the cooresponding message type,
81 the message will be sent to one endpoint in each group.
83 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
84 that the entry applies only to the instance running with the hostname 'name.'
86 Buffers received from the route table generator can contain multiple newline terminated
87 records, but each buffer must be less than 4K in length, and the last record in a
88 buffer may NOT be split across buffers.
91 In addition to the primary task of getting, vetting, and installing a new route table, or
92 updates to the existing table, this thread will periodically cause the send counts for each
93 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
94 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
96 static void* rtc( void* vctx ) {
97 uta_ctx_t* ctx; // context user has -- where we pin the route table
98 uta_ctx_t* pvt_cx; // private context for session with rtg
99 rmr_mbuf_t* msg = NULL; // message from rtg
100 char* payload; // payload in the message
102 size_t clen; // length to copy and mark
103 char* port; // a port number we listen/connect to
104 char* fport; // pointer to the real buffer to free
105 size_t buf_size; // nng needs var pointer not just size?
106 char* nextr; // pointer at next record in the message
107 char* curr; // current record
109 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
110 int cstate = -1; // connection state to rtg
111 int state; // processing state of some nng function
115 int pbuf_size = 0; // number allocated in pbuf
117 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
118 int vfd = -1; // verbose file des if we have one
119 int vlevel = 0; // how chatty we should be 0== no nattering allowed
121 int epfd = -1; // fd for epoll so we can multi-task
122 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
123 struct epoll_event epe; // event definition for event to listen to
124 int rcv_fd = -1; // pollable file des from NNG to use for timeout
125 int count_delay = 30; // number of seconds between writing count info; initially every 30s
126 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
129 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
130 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
134 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
135 vfd = open( eptr, O_RDONLY );
136 vlevel = refresh_vlevel( vfd );
139 read_static_rt( ctx, vlevel ); // seed the route table if one provided
141 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
142 port = strdup( DEF_RTG_PORT );
144 port = strdup( port );
147 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
148 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
151 fport = port; // must hold to free
153 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
156 port = tokens[0]; // just the port
160 port = tokens[1]; // tcp:port or :port
164 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
168 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
169 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
171 while( TRUE ) { // no listen port, just dump counts now and then
172 sleep( count_delay );
173 rt_epcounts( ctx->rtable, ctx->my_name );
176 free( fport ); // parinoid free and return
180 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
182 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
184 if( (epfd = epoll_create1( 0 )) < 0 ) {
185 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
188 epe.events = EPOLLIN;
189 epe.data.fd = rcv_fd;
191 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
192 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
199 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
202 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
204 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
206 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
207 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
208 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
209 if( raw_interface ) {
210 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
212 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
214 } else { // no msg, do extra tasks
215 if( msg != NULL ) { // if we were working with a message; ensure no len
217 msg->state = RMR_ERR_TIMEOUT;
221 if( time( NULL ) > blabber ) {
222 vlevel = refresh_vlevel( vfd );
223 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
224 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
225 if( blabber > bump_freq ) {
228 rt_epcounts( ctx->rtable, ctx->my_name );
233 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
235 if( msg != NULL && msg->len > 0 ) {
236 payload = msg->payload;
237 mlen = msg->len; // usable bytes in the payload
239 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
241 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
244 if( pbuf_size <= mlen ) {
251 pbuf_size = mlen * 2;
253 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
255 memcpy( pbuf, payload, mlen );
256 pbuf[mlen] = 0; // don't depend on sender making this a legit string
259 while( curr ) { // loop over each record in the buffer
260 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
267 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
269 parse_rt_rec( ctx, curr, vlevel ); // parse record and add to in progress table
274 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
278 msg->len = 0; // force back into the listen loop
282 return NULL; // unreachable, but some compilers don't see that and complain.