1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
47 A side thread which opens a socket and subscribes to a routing table generator.
48 It may do other things along the way (latency measurements?).
50 The pointer is a pointer to the context.
52 Listens for records from the route table generation publisher, expecting
53 one of the following, newline terminated, ASCII records:
54 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
55 new|start // start of new table
56 new|end // end of new table; complete
58 Name must be a host name which can be looked up via gethostbyname() (DNS).
60 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
61 for each message of the type that is sent.
63 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
64 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
66 If multiple groups are given, when send() is called for the cooresponding message type,
67 the message will be sent to one endpoint in each group.
69 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
70 that the entry applies only to the instance running with the hostname 'name.'
72 Buffers received from the route table generator can contain multiple newline terminated
73 records, but each buffer must be less than 4K in length, and the last record in a
74 buffere may NOT be split across buffers.
77 static void* rtc( void* vctx ) {
78 uta_ctx_t* ctx; // context user has -- where we pin the route table
79 uta_ctx_t* pvt_cx; // private context for session with rtg
80 rmr_mbuf_t* msg = NULL; // message from rtg
81 char* payload; // payload in the message
83 size_t clen; // length to copy and mark
84 char* port; // a port number we listen/connect to
85 char* fport; // pointer to the real buffer to free
86 size_t buf_size; // nng needs var pointer not just size?
87 char* nextr; // pointer at next record in the message
88 char* curr; // current record
90 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
91 int cstate = -1; // connection state to rtg
92 int state; // processing state of some nng function
96 int pbuf_size = 0; // number allocated in pbuf
98 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
99 int vfd = -1; // verbose file des if we have one
100 int vlevel = 0; // how chatty we should be 0== no nattering allowed
103 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
104 fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
108 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
109 vfd = open( eptr, O_RDONLY );
113 read( vfd, wbuf, 10 );
114 vlevel = atoi( wbuf );
118 read_static_rt( ctx, vlevel ); // seed the route table if one provided
120 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
121 port = strdup( DEF_RTG_PORT );
123 port = strdup( port );
126 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
127 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
130 fport = port; // must hold to free
132 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
135 port = tokens[0]; // just the port
139 port = tokens[1]; // tcp:port or :port
143 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
147 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
148 fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
153 if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
156 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
159 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
160 if( raw_interface ) {
161 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
163 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
166 if( vfd >= 0 ) { // if changed since last go round
169 read( vfd, wbuf, 10 );
170 vlevel = atoi( wbuf );
173 if( msg != NULL && msg->len > 0 ) {
174 payload = msg->payload;
175 mlen = msg->len; // usable bytes in the payload
177 fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
179 if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
182 if( pbuf_size <= mlen ) {
189 pbuf_size = mlen * 2;
191 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
193 memcpy( pbuf, payload, mlen );
194 pbuf[mlen] = 0; // don't depend on sender making this a legit string
197 while( curr ) { // loop over each record in the buffer
198 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
205 fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
207 parse_rt_rec( ctx, curr, vlevel ); // parse record and add to in progress table
212 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
216 if( time( NULL ) > blabber ) {
217 fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
218 blabber = time( NULL ) + 180; // limit to 1 every 3 min or so
223 return NULL; // unreachable, but some compilers don't see that and complain.