1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Opens the vlevel control file if needed and reads the vlevel from it.
57 The file is rewound if already open so that external updates are captured.
58 The current level is returnd; 0 on error.
60 The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
61 open and read. If missing, we will try /tmp/rmr.v. We will try to open the file
62 on each call if not alrady open; this allows the value to be supplied after
63 start which helps debugging.
65 If close_file is true, then we will close the open vfd and return 0;
67 extern int refresh_vlevel( int close_file ) {
71 char wbuf[128]; // read buffer; MUST be 11 or greater
82 if( vfd < 0 ) { // attempt to find/open on all calls if not open
83 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
84 vfd = open( eptr, O_RDONLY );
86 vfd = open( "/tmp/rmr.v", O_RDONLY );
93 memset( wbuf, 0, sizeof( char ) * 11 ); // ensure what we read will be nil terminated
94 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
95 vlevel = atoi( wbuf );
102 Loop forever (assuming we're running in a pthread reading the static table
105 static void* rtc_file( void* vctx ) {
106 uta_ctx_t* ctx; // context user has -- where we pin the route table
108 int vlevel = 0; // how chatty we should be 0== no nattering allowed
112 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
113 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
117 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
119 vlevel = refresh_vlevel( 0 );
120 read_static_rt( ctx, vlevel ); // seed the route table if one provided
122 if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
123 refresh_vlevel( 1 ); // close the verbose file if open
131 Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
132 records in each message; it is required that the last record in the message be complete (we do not
133 reconstruct records split over multiple messages). For each record, we call the record parser
134 to parse and add the information to the table being built.
136 This function was broken from the main rtc() function in order to be able to unit test it. Without
137 this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
140 To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
141 words, this is not thread safe but it shouldn't need to be.
143 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
144 static unsigned char* pbuf = NULL;
145 static int pbuf_size = 0;
147 unsigned char* payload;
149 unsigned char* nextr;
152 payload = msg->payload;
153 mlen = msg->len; // usable bytes in the payload
155 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
156 switch( msg->mtype ) {
157 case RMRRM_TABLE_DATA:
158 if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
159 *flags |= RTCFL_HAVE_UPDATE;
160 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
163 if( pbuf_size <= mlen ) {
170 pbuf_size = mlen * 2;
172 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
174 memcpy( pbuf, payload, mlen );
175 pbuf[mlen] = 0; // don't depend on sender making this a legit string
177 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
181 while( curr ) { // loop over each record in the buffer
182 nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
189 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: processing (%s)\n", curr );
191 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
196 msg->len = 0; // force back into the listen loop
200 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
206 Route Table Collector
207 A side thread which either attempts to connect and request a table
208 from the Route Manager, or opens a port and listens for Route Manager
209 to push table updates.
211 It may do other things along the way (latency measurements, alarms,
212 respond to RMR pings, etc.).
214 The behaviour with respect to listening for Route Manager updates vs
215 the initiation of the connection and sending a request depends on the
216 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
217 host:port, or IP:port, is given, then we assume that we make the connection
218 and send a request for the table (request mode). If the variable is just
219 a port, then we assume Route Manager will connect and push updates (original
222 If the variable is not defined, the default behaviour, in order to be
223 backwards compatable, depends on the presence of the ENV_CTL_PORT
224 (RMR_CTL_PORT) variable (new with the support for requesting a table).
227 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
228 unset unset Open default CTL port (DEF_CTL_PORT) and
229 wait for Rt Mgr to push tables
231 set unset Use the default Rt Mgr wellknown addr
232 and port (DEF_RTG_WK_ADDR) to connect
233 and request a table. The control port
234 used is the value set by ENV_CTL_PORT.
236 unset set As described above. The default control
237 port (DEF_CTL_PORT) is used.
239 When we are running in request mode, then we will send the RMR message
240 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
241 to send a new table. We will attempt to connect and send requests until
242 we have a table. Calls to rmr_ready() will report FALSE until a table is
243 loaded _unless_ a seed table was given.
245 Route table information is expected to arrive on RMR messages with type
246 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
247 table record, so the payload is as it appears in the seed file or as
248 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
249 to completely supply a new table or table update. See the header for parse_rt_rec
250 in common for a description of possible message contents.
252 Buffers received from the route table generator can contain multiple newline terminated
253 records, but each buffer must be less than 4K in length, and the last record in a
254 buffer may NOT be split across buffers.
257 In addition to the primary task of getting, vetting, and installing a new route table, or
258 updates to the existing table, this thread will periodically cause the send counts for each
259 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
260 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
262 static void* rtc( void* vctx ) {
263 uta_ctx_t* ctx; // context user has -- where we pin the route table
264 uta_ctx_t* pvt_cx; // private context for session with rtg
265 rmr_mbuf_t* msg = NULL; // message from rtg
266 char* my_port; // the port number that we will listen on (4561 has been the default for this)
267 char* rtg_addr; // host:port address of route table generator (route manager)
268 char* daddr; // duplicated rtg address string to parse/trash
269 size_t buf_size; // nng needs var pointer not just size?
271 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
272 int cstate = -1; // connection state to rtg
273 int state; // processing state of some nng function
277 int vlevel = 0; // how chatty we should be 0== no nattering allowed
279 int epfd = -1; // fd for epoll so we can multi-task
280 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
281 struct epoll_event epe; // event definition for event to listen to
282 int count_delay = 30; // number of seconds between writing count info; initially every 30s
283 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
285 int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
286 int nxt_rt_req = 0; // time of next request
289 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
290 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
294 vlevel = refresh_vlevel( 0 );
296 if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
297 rt_req_freq = atoi( eptr );
298 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
299 rt_req_freq = DEF_RTREQ_FREQ;
300 rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
303 rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
305 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
306 read_static_rt( ctx, vlevel ); // seed the route table if one provided
307 ctx->flags &= ~CFL_NO_RTACK;
310 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
311 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
312 my_port = DEF_CTL_PORT;
313 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
315 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
318 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
322 daddr = strdup( rtg_addr ); // dup to destroy during parse
324 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
326 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
330 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
331 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
335 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
336 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
339 // rtg_addr points at rt mgr address and my port set from env or default stands as is
344 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
345 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
347 while( TRUE ) { // no listen port, just dump counts now and then
348 sleep( count_delay );
349 rt_epcounts( ctx->rtable, ctx->my_name );
357 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
359 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
361 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
362 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
363 if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
364 if( ctx->rtg_whid < 0 ) {
365 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
367 send_update_req( pvt_cx, ctx );
368 nxt_rt_req = time( NULL ) + rt_req_freq;
371 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
373 if( time( NULL ) > blabber ) {
374 vlevel = refresh_vlevel( 0 );
375 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
376 if( blabber > bump_freq ) {
379 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
380 rt_epcounts( ctx->rtable, ctx->my_name );
384 if( ctx->shutdown != 0 ) {
385 break; // mostly for unit test, but allows a forced stop
389 vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
391 if( msg != NULL && msg->len > 0 ) {
392 rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
395 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
401 return NULL; // unreachable, but some compilers don't see that and complain.
405 // this is nng specific inas much as we allow raw (non-RMR) messages
408 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
409 from the route manger. It is deprecated in favour of managing all RM-RMR
410 communications via an RMR session.
412 The rtc() function above is the new and preferred function regardless
415 -----------------------------------------------------------------------------------
416 Route Table Collector
417 A side thread which opens a socket and subscribes to a routing table generator.
418 It may do other things along the way (latency measurements?).
420 The pointer is a pointer to the context.
422 Listens for records from the route table generation publisher, expecting
423 one of the following, newline terminated, ASCII records:
424 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
425 new|start // start of new table
426 new|end // end of new table; complete
428 Name must be a host name which can be looked up via gethostbyname() (DNS).
430 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
431 for each message of the type that is sent.
433 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
434 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
436 If multiple groups are given, when send() is called for the cooresponding message type,
437 the message will be sent to one endpoint in each group.
439 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
440 that the entry applies only to the instance running with the hostname 'name.'
442 Buffers received from the route table generator can contain multiple newline terminated
443 records, but each buffer must be less than 4K in length, and the last record in a
444 buffer may NOT be split across buffers.
447 In addition to the primary task of getting, vetting, and installing a new route table, or
448 updates to the existing table, this thread will periodically cause the send counts for each
449 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
450 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
452 static void* raw_rtc( void* vctx ) {
453 uta_ctx_t* ctx; // context user has -- where we pin the route table
454 uta_ctx_t* pvt_cx; // private context for session with rtg
455 rmr_mbuf_t* msg = NULL; // message from rtg
456 char* payload; // payload in the message
458 char* port; // a port number we listen/connect to
459 char* fport; // pointer to the real buffer to free
460 size_t buf_size; // nng needs var pointer not just size?
461 char* nextr; // pointer at next record in the message
462 char* curr; // current record
464 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
465 int cstate = -1; // connection state to rtg
466 int state; // processing state of some nng function
470 int pbuf_size = 0; // number allocated in pbuf
472 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
473 int vlevel = 0; // how chatty we should be 0== no nattering allowed
475 int epfd = -1; // fd for epoll so we can multi-task
476 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
477 struct epoll_event epe; // event definition for event to listen to
478 int rcv_fd = -1; // pollable file des from NNG to use for timeout
479 int count_delay = 30; // number of seconds between writing count info; initially every 30s
480 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
483 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
484 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
488 vlevel = refresh_vlevel( 0 );
489 read_static_rt( ctx, vlevel ); // seed the route table if one provided
491 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
492 port = strdup( DEF_RTG_PORT );
494 port = strdup( port );
497 fport = port; // must hold to free
499 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
502 port = tokens[0]; // just the port
506 port = tokens[1]; // tcp:port or :port
510 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
514 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
515 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
517 while( TRUE ) { // no listen port, just dump counts now and then
518 sleep( count_delay );
519 rt_epcounts( ctx->rtable, ctx->my_name );
522 free( fport ); // parinoid free and return
526 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
528 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
530 if( (epfd = epoll_create1( 0 )) < 0 ) {
531 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
534 epe.events = EPOLLIN;
535 epe.data.fd = rcv_fd;
537 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
538 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
545 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
548 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
550 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
552 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
553 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
554 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
555 if( raw_interface ) {
556 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
558 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
560 } else { // no msg, do extra tasks
561 if( msg != NULL ) { // if we were working with a message; ensure no len
563 msg->state = RMR_ERR_TIMEOUT;
567 if( time( NULL ) > blabber ) {
568 vlevel = refresh_vlevel( 0 );
569 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
570 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
571 if( blabber > bump_freq ) {
574 rt_epcounts( ctx->rtable, ctx->my_name );
579 vlevel = refresh_vlevel( 0 ); // ensure it's fresh when we get a message
581 if( msg != NULL && msg->len > 0 ) {
582 payload = msg->payload;
583 mlen = msg->len; // usable bytes in the payload
585 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
587 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
590 if( pbuf_size <= mlen ) {
597 pbuf_size = mlen * 2;
599 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
601 memcpy( pbuf, payload, mlen );
602 pbuf[mlen] = 0; // don't depend on sender making this a legit string
605 while( curr ) { // loop over each record in the buffer
606 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
613 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
615 if( raw_interface ) {
616 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
618 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
624 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
628 msg->len = 0; // force back into the listen loop
632 return NULL; // unreachable, but some compilers don't see that and complain.