1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtc_si_static.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 This comes from the common src and may be moved back there once
28 it is not necessary to support raw sessions (all route table
29 gen messages are received over rmr channel).
31 Author: E. Scott Daniels
32 Date: 29 November 2018 (extracted to common 13 March 2019)
33 Imported to si base 17 Jan 2020.
37 #ifndef _rtc_si_staic_c
38 #define _rtc_si_staic_c
46 #include <sys/types.h>
51 Loop forever (assuming we're running in a pthread reading the static table
54 static void* rtc_file( void* vctx ) {
55 uta_ctx_t* ctx; // context user has -- where we pin the route table
57 int vfd = -1; // verbose file des if we have one
58 int vlevel = 0; // how chatty we should be 0== no nattering allowed
62 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
63 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
67 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
68 vfd = open( eptr, O_RDONLY );
75 read( vfd, wbuf, 10 );
76 vlevel = atoi( wbuf );
79 read_static_rt( ctx, vlevel ); // seed the route table if one provided
86 static int refresh_vlevel( int vfd ) {
90 if( vfd >= 0 ) { // if file is open, read current value
93 read( vfd, rbuf, 10 );
94 vlevel = atoi( rbuf );
101 Route Table Collector
102 A side thread which opens a socket and subscribes to a routing table generator.
103 It may do other things along the way (latency measurements?).
105 The pointer is a pointer to the context.
107 Listens for records from the route table generation publisher, expecting
108 one of the following, newline terminated, ASCII records:
109 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
110 new|start // start of new table
111 new|end // end of new table; complete
113 Name must be a host name which can be looked up via gethostbyname() (DNS).
115 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
116 for each message of the type that is sent.
118 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
119 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
121 If multiple groups are given, when send() is called for the cooresponding message type,
122 the message will be sent to one endpoint in each group.
124 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
125 that the entry applies only to the instance running with the hostname 'name.'
127 Buffers received from the route table generator can contain multiple newline terminated
128 records, but each buffer must be less than 4K in length, and the last record in a
129 buffer may NOT be split across buffers.
132 In addition to the primary task of getting, vetting, and installing a new route table, or
133 updates to the existing table, this thread will periodically cause the send counts for each
134 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
135 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
137 static void* rtc( void* vctx ) {
138 uta_ctx_t* ctx; // context user has -- where we pin the route table
139 uta_ctx_t* pvt_cx; // private context for session with rtg
140 rmr_mbuf_t* msg = NULL; // message from rtg
141 char* payload; // payload in the message
143 size_t clen; // length to copy and mark
144 char* port; // a port number we listen/connect to
145 char* fport; // pointer to the real buffer to free
146 size_t buf_size; // nng needs var pointer not just size?
147 char* nextr; // pointer at next record in the message
148 char* curr; // current record
150 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
151 int cstate = -1; // connection state to rtg
152 int state; // processing state of some nng function
156 int pbuf_size = 0; // number allocated in pbuf
158 int raw_interface = 0; // rtg is using raw NNG/Nano not RMr to send updates
159 int vfd = -1; // verbose file des if we have one
160 int vlevel = 0; // how chatty we should be 0== no nattering allowed
162 int epfd = -1; // fd for epoll so we can multi-task
163 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
164 struct epoll_event epe; // event definition for event to listen to
165 int count_delay = 30; // number of seconds between writing count info; initially every 30s
166 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
169 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
170 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
174 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
175 vfd = open( eptr, O_RDONLY );
176 vlevel = refresh_vlevel( vfd );
179 read_static_rt( ctx, vlevel ); // seed the route table if one provided
181 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
182 port = strdup( DEF_RTG_PORT );
184 port = strdup( port );
187 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
188 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
191 fport = port; // must hold to free
193 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
196 port = tokens[0]; // just the port
200 port = tokens[1]; // tcp:port or :port
204 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
208 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
209 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
211 while( TRUE ) { // no listen port, just dump counts now and then
212 sleep( count_delay );
213 rt_epcounts( ctx->rtable, ctx->my_name );
216 free( fport ); // parinoid free and return
220 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
223 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
225 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
227 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
228 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
229 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
231 if( time( NULL ) > blabber ) {
232 vlevel = refresh_vlevel( vfd );
233 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
234 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
235 if( blabber > bump_freq ) {
238 rt_epcounts( ctx->rtable, ctx->my_name );
243 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
245 if( msg != NULL && msg->len > 0 ) {
246 payload = msg->payload;
247 mlen = msg->len; // usable bytes in the payload
249 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
251 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
254 if( pbuf_size <= mlen ) {
261 pbuf_size = mlen * 2;
263 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
265 memcpy( pbuf, payload, mlen );
266 pbuf[mlen] = 0; // don't depend on sender making this a legit string
269 while( curr ) { // loop over each record in the buffer
270 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
277 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
279 parse_rt_rec( ctx, curr, vlevel ); // parse record and add to in progress table
284 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
288 msg->len = 0; // force back into the listen loop
292 return NULL; // unreachable, but some compilers don't see that and complain.