1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Loop forever (assuming we're running in a pthread reading the static table
59 static void* rtc_file( void* vctx ) {
60 uta_ctx_t* ctx; // context user has -- where we pin the route table
62 int vfd = -1; // verbose file des if we have one
63 int vlevel = 0; // how chatty we should be 0== no nattering allowed
67 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
72 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73 vfd = open( eptr, O_RDONLY );
76 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
81 read( vfd, wbuf, 10 );
82 vlevel = atoi( wbuf );
85 read_static_rt( ctx, vlevel ); // seed the route table if one provided
91 static int refresh_vlevel( int vfd ) {
95 if( vfd >= 0 ) { // if file is open, read current value
98 read( vfd, rbuf, 10 );
99 vlevel = atoi( rbuf );
106 Route Table Collector
107 A side thread which either attempts to connect and request a table
108 from the Route Manager, or opens a port and listens for Route Manager
109 to push table updates.
111 It may do other things along the way (latency measurements, alarms,
112 respond to RMR pings, etc.).
114 The behaviour with respect to listening for Route Manager updates vs
115 the initiation of the connection and sending a request depends on the
116 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
117 host:port, or IP:port, is given, then we assume that we make the connection
118 and send a request for the table (request mode). If the variable is just
119 a port, then we assume Route Manager will connect and push updates (original
122 If the variable is not defined, the default behaviour, in order to be
123 backwards compatable, depends on the presence of the ENV_CTL_PORT
124 (RMR_CTL_PORT) variable (new with the support for requesting a table).
127 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
128 unset unset Open default CTL port (DEF_CTL_PORT) and
129 wait for Rt Mgr to push tables
131 set unset Use the default Rt Mgr wellknown addr
132 and port (DEF_RTG_WK_ADDR) to connect
133 and request a table. The control port
134 used is the value set by ENV_CTL_PORT.
136 unset set As described above. The default control
137 port (DEF_CTL_PORT) is used.
139 When we are running in request mode, then we will send the RMR message
140 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
141 to send a new table. We will attempt to connect and send requests until
142 we have a table. Calls to rmr_ready() will report FALSE until a table is
143 loaded _unless_ a seed table was given.
145 Route table information is expected to arrive on RMR messages with type
146 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
147 table record, so the payload is as it appears in the seed file or as
148 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
149 to completely supply a new table or table update. See the header for parse_rt_rec
150 in common for a description of possible message contents.
152 Buffers received from the route table generator can contain multiple newline terminated
153 records, but each buffer must be less than 4K in length, and the last record in a
154 buffer may NOT be split across buffers.
157 In addition to the primary task of getting, vetting, and installing a new route table, or
158 updates to the existing table, this thread will periodically cause the send counts for each
159 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
160 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
162 static void* rtc( void* vctx ) {
163 uta_ctx_t* ctx; // context user has -- where we pin the route table
164 uta_ctx_t* pvt_cx; // private context for session with rtg
165 rmr_mbuf_t* msg = NULL; // message from rtg
166 char* payload; // payload in the message
168 char* my_port; // the port number that we will listen on (4561 has been the default for this)
169 char* rtg_addr; // host:port address of route table generator (route manager)
170 char* daddr; // duplicated rtg address string to parse/trash
171 size_t buf_size; // nng needs var pointer not just size?
172 char* nextr; // pointer at next record in the message
173 char* curr; // current record
175 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
176 int cstate = -1; // connection state to rtg
177 int state; // processing state of some nng function
181 int pbuf_size = 0; // number allocated in pbuf
183 int vfd = -1; // verbose file des if we have one
184 int vlevel = 0; // how chatty we should be 0== no nattering allowed
186 int epfd = -1; // fd for epoll so we can multi-task
187 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
188 struct epoll_event epe; // event definition for event to listen to
189 int count_delay = 30; // number of seconds between writing count info; initially every 30s
190 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
194 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
195 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
199 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
200 vfd = open( eptr, O_RDONLY );
201 vlevel = refresh_vlevel( vfd );
204 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
205 read_static_rt( ctx, vlevel ); // seed the route table if one provided
206 ctx->flags &= ~CFL_NO_RTACK;
209 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
210 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
211 my_port = DEF_CTL_PORT;
212 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
214 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
217 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
221 daddr = strdup( rtg_addr ); // dup to destroy during parse
223 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
225 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
229 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
230 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
234 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
235 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
238 // rtg_addr points at rt mgr address and my port set from env or default stands as is
243 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
244 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
246 while( TRUE ) { // no listen port, just dump counts now and then
247 sleep( count_delay );
248 rt_epcounts( ctx->rtable, ctx->my_name );
256 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
258 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
260 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
261 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
262 if( (flags & RTCFL_HAVE_UPDATE) == 0 ) { // no route table updated from rt mgr; request one
263 if( ctx->rtg_whid < 0 ) {
264 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
266 send_update_req( pvt_cx, ctx );
269 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
271 if( time( NULL ) > blabber ) {
272 vlevel = refresh_vlevel( vfd );
273 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
274 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
275 if( blabber > bump_freq ) {
278 rt_epcounts( ctx->rtable, ctx->my_name );
283 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
285 if( msg != NULL && msg->len > 0 ) {
286 payload = msg->payload;
287 mlen = msg->len; // usable bytes in the payload
289 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
291 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
294 switch( msg->mtype ) {
295 case RMRRM_TABLE_DATA:
296 if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
297 flags |= RTCFL_HAVE_UPDATE;
298 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
301 if( pbuf_size <= mlen ) {
308 pbuf_size = mlen * 2;
310 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
312 memcpy( pbuf, payload, mlen );
313 pbuf[mlen] = 0; // don't depend on sender making this a legit string
316 while( curr ) { // loop over each record in the buffer
317 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
324 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
326 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
331 msg->len = 0; // force back into the listen loop
335 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
340 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
346 return NULL; // unreachable, but some compilers don't see that and complain.
350 // this is nng specific inas much as we allow raw (non-RMR) messages
353 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
354 from the route manger. It is deprecated in favour of managing all RM-RMR
355 communications via an RMR session.
357 The rtc() function above is the new and preferred function regardless
360 -----------------------------------------------------------------------------------
361 Route Table Collector
362 A side thread which opens a socket and subscribes to a routing table generator.
363 It may do other things along the way (latency measurements?).
365 The pointer is a pointer to the context.
367 Listens for records from the route table generation publisher, expecting
368 one of the following, newline terminated, ASCII records:
369 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
370 new|start // start of new table
371 new|end // end of new table; complete
373 Name must be a host name which can be looked up via gethostbyname() (DNS).
375 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
376 for each message of the type that is sent.
378 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
379 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
381 If multiple groups are given, when send() is called for the cooresponding message type,
382 the message will be sent to one endpoint in each group.
384 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
385 that the entry applies only to the instance running with the hostname 'name.'
387 Buffers received from the route table generator can contain multiple newline terminated
388 records, but each buffer must be less than 4K in length, and the last record in a
389 buffer may NOT be split across buffers.
392 In addition to the primary task of getting, vetting, and installing a new route table, or
393 updates to the existing table, this thread will periodically cause the send counts for each
394 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
395 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
397 static void* raw_rtc( void* vctx ) {
398 uta_ctx_t* ctx; // context user has -- where we pin the route table
399 uta_ctx_t* pvt_cx; // private context for session with rtg
400 rmr_mbuf_t* msg = NULL; // message from rtg
401 char* payload; // payload in the message
403 char* port; // a port number we listen/connect to
404 char* fport; // pointer to the real buffer to free
405 size_t buf_size; // nng needs var pointer not just size?
406 char* nextr; // pointer at next record in the message
407 char* curr; // current record
409 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
410 int cstate = -1; // connection state to rtg
411 int state; // processing state of some nng function
415 int pbuf_size = 0; // number allocated in pbuf
417 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
418 int vfd = -1; // verbose file des if we have one
419 int vlevel = 0; // how chatty we should be 0== no nattering allowed
421 int epfd = -1; // fd for epoll so we can multi-task
422 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
423 struct epoll_event epe; // event definition for event to listen to
424 int rcv_fd = -1; // pollable file des from NNG to use for timeout
425 int count_delay = 30; // number of seconds between writing count info; initially every 30s
426 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
429 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
430 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
434 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
435 vfd = open( eptr, O_RDONLY );
436 vlevel = refresh_vlevel( vfd );
439 read_static_rt( ctx, vlevel ); // seed the route table if one provided
441 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
442 port = strdup( DEF_RTG_PORT );
444 port = strdup( port );
448 this test is now done in init and this function is started _only_ if the value was 1
449 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
450 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
454 fport = port; // must hold to free
456 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
459 port = tokens[0]; // just the port
463 port = tokens[1]; // tcp:port or :port
467 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
471 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
472 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
474 while( TRUE ) { // no listen port, just dump counts now and then
475 sleep( count_delay );
476 rt_epcounts( ctx->rtable, ctx->my_name );
479 free( fport ); // parinoid free and return
483 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
485 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
487 if( (epfd = epoll_create1( 0 )) < 0 ) {
488 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
491 epe.events = EPOLLIN;
492 epe.data.fd = rcv_fd;
494 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
495 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
502 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
505 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
507 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
509 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
510 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
511 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
512 if( raw_interface ) {
513 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
515 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
517 } else { // no msg, do extra tasks
518 if( msg != NULL ) { // if we were working with a message; ensure no len
520 msg->state = RMR_ERR_TIMEOUT;
524 if( time( NULL ) > blabber ) {
525 vlevel = refresh_vlevel( vfd );
526 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
527 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
528 if( blabber > bump_freq ) {
531 rt_epcounts( ctx->rtable, ctx->my_name );
536 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
538 if( msg != NULL && msg->len > 0 ) {
539 payload = msg->payload;
540 mlen = msg->len; // usable bytes in the payload
542 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
544 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
547 if( pbuf_size <= mlen ) {
554 pbuf_size = mlen * 2;
556 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
558 memcpy( pbuf, payload, mlen );
559 pbuf[mlen] = 0; // don't depend on sender making this a legit string
562 while( curr ) { // loop over each record in the buffer
563 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
570 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
572 if( raw_interface ) {
573 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
575 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
581 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
585 msg->len = 0; // force back into the listen loop
589 return NULL; // unreachable, but some compilers don't see that and complain.