1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2021 Nokia
5 Copyright (c) 2018-2021 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Opens the vlevel control file if needed and reads the vlevel from it.
57 The file is rewound if already open so that external updates are captured.
58 The current level is returnd; 0 on error.
60 The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
61 open and read. If missing, we will try /tmp/rmr.v. We will try to open the file
62 on each call if not alrady open; this allows the value to be supplied after
63 start which helps debugging.
65 If close_file is true, then we will close the open vfd and return 0;
67 extern int refresh_vlevel( int close_file ) {
71 char wbuf[128]; // read buffer; MUST be 11 or greater
82 if( vfd < 0 ) { // attempt to find/open on all calls if not open
83 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
84 vfd = open( eptr, O_RDONLY );
86 vfd = open( "/tmp/rmr.v", O_RDONLY );
93 memset( wbuf, 0, sizeof( char ) * 11 ); // ensure what we read will be nil terminated
94 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
95 vlevel = atoi( wbuf );
102 Loop forever (assuming we're running in a pthread reading the static table
105 static void* rtc_file( void* vctx ) {
106 uta_ctx_t* ctx; // context user has -- where we pin the route table
108 int vlevel = 0; // how chatty we should be 0== no nattering allowed
112 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
113 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
117 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
119 vlevel = refresh_vlevel( 0 );
120 read_static_rt( ctx, vlevel ); // refresh from the file
122 if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
123 refresh_vlevel( 1 ); // close the verbose file if open
127 if( ctx->rtable_ready ) {
130 sleep( 1 ); // check every second until we have a good one
136 Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
137 records in each message; it is required that the last record in the message be complete (we do not
138 reconstruct records split over multiple messages). For each record, we call the record parser
139 to parse and add the information to the table being built.
141 This function was broken from the main rtc() function in order to be able to unit test it. Without
142 this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
145 To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
146 words, this is not thread safe but it shouldn't need to be.
148 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
149 static unsigned char* pbuf = NULL;
150 static int pbuf_size = 0;
152 unsigned char* payload;
154 unsigned char* nextr;
157 payload = msg->payload;
158 mlen = msg->len; // usable bytes in the payload
160 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
161 switch( msg->mtype ) {
162 case RMRRM_TABLE_DATA:
163 if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
164 *flags |= RTCFL_HAVE_UPDATE;
165 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
168 if( pbuf_size <= mlen ) {
175 pbuf_size = mlen * 2;
177 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
179 memcpy( pbuf, payload, mlen );
180 pbuf[mlen] = 0; // don't depend on sender making this a legit string
182 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
186 while( curr ) { // loop over each record in the buffer
187 nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
194 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
196 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
201 msg->len = 0; // force back into the listen loop
205 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
211 Route Table Collector
212 A side thread which either attempts to connect and request a table
213 from the Route Manager, or opens a port and listens for Route Manager
214 to push table updates.
216 It may do other things along the way (latency measurements, alarms,
217 respond to RMR pings, etc.).
219 The behaviour with respect to listening for Route Manager updates vs
220 the initiation of the connection and sending a request depends on the
221 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
222 host:port, or IP:port, is given, then we assume that we make the connection
223 and send a request for the table (request mode). If the variable is just
224 a port, then we assume Route Manager will connect and push updates (original
227 If the variable is not defined, the default behaviour, in order to be
228 backwards compatable, depends on the presence of the ENV_CTL_PORT
229 (RMR_CTL_PORT) variable (new with the support for requesting a table).
232 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
233 unset unset Open default CTL port (DEF_CTL_PORT) and
234 wait for Rt Mgr to push tables
236 set unset Use the default Rt Mgr wellknown addr
237 and port (DEF_RTG_WK_ADDR) to connect
238 and request a table. The control port
239 used is the value set by ENV_CTL_PORT.
241 unset set As described above. The default control
242 port (DEF_CTL_PORT) is used.
244 When we are running in request mode, then we will send the RMR message
245 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
246 to send a new table. We will attempt to connect and send requests until
247 we have a table. Calls to rmr_ready() will report FALSE until a table is
248 loaded _unless_ a seed table was given.
250 Route table information is expected to arrive on RMR messages with type
251 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
252 table record, so the payload is as it appears in the seed file or as
253 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
254 to completely supply a new table or table update. See the header for parse_rt_rec
255 in common for a description of possible message contents.
257 Buffers received from the route table generator can contain multiple newline terminated
258 records, but each buffer must be less than 4K in length, and the last record in a
259 buffer may NOT be split across buffers.
262 In addition to the primary task of getting, vetting, and installing a new route table, or
263 updates to the existing table, this thread will periodically cause the send counts for each
264 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
265 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
267 static void* rtc( void* vctx ) {
268 uta_ctx_t* ctx; // context user has -- where we pin the route table
269 uta_ctx_t* pvt_cx; // private context for session with rtg
270 rmr_mbuf_t* msg = NULL; // message from rtg
271 route_table_t* rt; // the routing table that will be traversed to print statistics
272 char* my_port; // the port number that we will listen on (4561 has been the default for this)
273 char* rtg_addr; // host:port address of route table generator (route manager)
274 char* daddr; // duplicated rtg address string to parse/trash
275 size_t buf_size; // nng needs var pointer not just size?
277 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
278 int cstate = -1; // connection state to rtg
279 int state; // processing state of some nng function
283 int vlevel = 0; // how chatty we should be 0== no nattering allowed
285 int epfd = -1; // fd for epoll so we can multi-task
286 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
287 struct epoll_event epe; // event definition for event to listen to
288 int count_delay = 30; // number of seconds between writing count info; initially every 30s
289 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
291 int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
292 int nxt_rt_req = 0; // time of next request
295 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
296 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
300 vlevel = refresh_vlevel( 0 );
302 if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
303 rt_req_freq = atoi( eptr );
304 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
305 rt_req_freq = DEF_RTREQ_FREQ;
306 rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
309 rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
311 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
312 read_static_rt( ctx, vlevel ); // seed the route table if one provided
313 ctx->flags &= ~CFL_NO_RTACK;
314 ctx->flags &= ~CFL_FULLRT; // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
317 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
318 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
319 my_port = DEF_CTL_PORT;
320 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
322 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
325 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
329 daddr = strdup( rtg_addr ); // dup to destroy during parse
331 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
333 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
337 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
338 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
342 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
343 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
346 // rtg_addr points at rt mgr address and my port set from env or default stands as is
351 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
352 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
354 while( TRUE ) { // no listen port, just dump counts now and then
355 sleep( count_delay );
356 rt_epcounts( ctx->rtable, ctx->my_name );
364 cycle_snarfed_rt( ctx ); // cause the nrt to be opened
366 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
368 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
370 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
371 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
372 if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
373 if( ctx->rtg_whid < 0 ) {
374 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
376 send_update_req( pvt_cx, ctx );
377 nxt_rt_req = time( NULL ) + rt_req_freq;
380 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
382 if( time( NULL ) > blabber ) {
383 vlevel = refresh_vlevel( 0 );
384 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
385 if( blabber > bump_freq ) {
388 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
389 rt = get_rt( ctx ); // get active route table and up ref count
390 rt_epcounts( rt, ctx->my_name );
391 release_rt( ctx, rt ); // dec safely the ref counter
395 if( ctx->shutdown != 0 ) {
396 break; // mostly for unit test, but allows a forced stop
399 // extra housekeeping chores can be added here...
400 alarm_if_drops( ctx, pvt_cx ); // send an alarm if message are dropping, clear if we set one and thtings are better
403 vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
405 if( msg != NULL && msg->len > 0 ) {
406 rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
409 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
415 return NULL; // unreachable, but some compilers don't see that and complain.
419 // this is nng specific inas much as we allow raw (non-RMR) messages
422 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
423 from the route manger. It is deprecated in favour of managing all RM-RMR
424 communications via an RMR session.
426 The rtc() function above is the new and preferred function regardless
429 -----------------------------------------------------------------------------------
430 Route Table Collector
431 A side thread which opens a socket and subscribes to a routing table generator.
432 It may do other things along the way (latency measurements?).
434 The pointer is a pointer to the context.
436 Listens for records from the route table generation publisher, expecting
437 one of the following, newline terminated, ASCII records:
438 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
439 new|start // start of new table
440 new|end // end of new table; complete
442 Name must be a host name which can be looked up via gethostbyname() (DNS).
444 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
445 for each message of the type that is sent.
447 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
448 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
450 If multiple groups are given, when send() is called for the cooresponding message type,
451 the message will be sent to one endpoint in each group.
453 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
454 that the entry applies only to the instance running with the hostname 'name.'
456 Buffers received from the route table generator can contain multiple newline terminated
457 records, but each buffer must be less than 4K in length, and the last record in a
458 buffer may NOT be split across buffers.
461 In addition to the primary task of getting, vetting, and installing a new route table, or
462 updates to the existing table, this thread will periodically cause the send counts for each
463 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
464 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
466 static void* raw_rtc( void* vctx ) {
467 uta_ctx_t* ctx; // context user has -- where we pin the route table
468 uta_ctx_t* pvt_cx; // private context for session with rtg
469 rmr_mbuf_t* msg = NULL; // message from rtg
470 route_table_t* rt; // the routing table that will be traversed to print statistics
471 char* payload; // payload in the message
473 char* port; // a port number we listen/connect to
474 char* fport; // pointer to the real buffer to free
475 size_t buf_size; // nng needs var pointer not just size?
476 char* nextr; // pointer at next record in the message
477 char* curr; // current record
479 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
480 int cstate = -1; // connection state to rtg
481 int state; // processing state of some nng function
485 int pbuf_size = 0; // number allocated in pbuf
487 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
488 int vlevel = 0; // how chatty we should be 0== no nattering allowed
490 int epfd = -1; // fd for epoll so we can multi-task
491 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
492 struct epoll_event epe; // event definition for event to listen to
493 int rcv_fd = -1; // pollable file des from NNG to use for timeout
494 int count_delay = 30; // number of seconds between writing count info; initially every 30s
495 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
498 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
499 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
503 vlevel = refresh_vlevel( 0 );
504 read_static_rt( ctx, vlevel ); // seed the route table if one provided
506 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
507 port = strdup( DEF_RTG_PORT );
509 port = strdup( port );
512 fport = port; // must hold to free
514 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
517 port = tokens[0]; // just the port
521 port = tokens[1]; // tcp:port or :port
525 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
529 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
530 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
532 while( TRUE ) { // no listen port, just dump counts now and then
533 sleep( count_delay );
534 rt_epcounts( ctx->rtable, ctx->my_name );
537 free( fport ); // parinoid free and return
541 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
543 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
545 if( (epfd = epoll_create1( 0 )) < 0 ) {
546 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
549 epe.events = EPOLLIN;
550 epe.data.fd = rcv_fd;
552 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
553 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
560 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
563 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
565 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
567 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
568 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
569 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
570 if( raw_interface ) {
571 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
573 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
575 } else { // no msg, do extra tasks
576 if( msg != NULL ) { // if we were working with a message; ensure no len
578 msg->state = RMR_ERR_TIMEOUT;
582 if( time( NULL ) > blabber ) {
583 vlevel = refresh_vlevel( 0 );
584 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
585 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
586 if( blabber > bump_freq ) {
589 rt = get_rt( ctx ); // get active route table and up ref count
590 rt_epcounts( rt, ctx->my_name );
591 release_rt( ctx, rt ); // dec safely the ref counter
595 alarm_if_drops( ctx ); // send an alarm if message are dropping, clear if we set one and thtings are better
598 vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
600 if( msg != NULL && msg->len > 0 ) {
601 payload = msg->payload;
602 mlen = msg->len; // usable bytes in the payload
604 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
606 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
609 if( pbuf_size <= mlen ) {
616 pbuf_size = mlen * 2;
618 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
620 memcpy( pbuf, payload, mlen );
621 pbuf[mlen] = 0; // don't depend on sender making this a legit string
624 while( curr ) { // loop over each record in the buffer
625 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
632 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
634 if( raw_interface ) {
635 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
637 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
643 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
647 msg->len = 0; // force back into the listen loop
651 return NULL; // unreachable, but some compilers don't see that and complain.