1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Loop forever (assuming we're running in a pthread reading the static table
59 static void* rtc_file( void* vctx ) {
60 uta_ctx_t* ctx; // context user has -- where we pin the route table
62 int vfd = -1; // verbose file des if we have one
63 int vlevel = 0; // how chatty we should be 0== no nattering allowed
67 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
72 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73 vfd = open( eptr, O_RDONLY );
76 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
79 memset( wbuf, 0, sizeof( char ) * 11 );
80 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
81 vlevel = atoi( wbuf );
85 read_static_rt( ctx, vlevel ); // seed the route table if one provided
87 if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
97 static int refresh_vlevel( int vfd ) {
101 if( vfd >= 0 ) { // if file is open, read current value
103 memset( rbuf, 0, sizeof( char ) * 11 );
104 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, rbuf, 10 ) > 0 ) {
105 vlevel = atoi( rbuf );
113 Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
114 records in each message; it is required that the last record in the message be complete (we do not
115 reconstruct records split over multiple messages). For each record, we call the record parser
116 to parse and add the information to the table being built.
118 This function was broken from the main rtc() function in order to be able to unit test it. Without
119 this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
122 To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
123 words, this is not thread safe but it shouldn't need to be.
125 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
126 static unsigned char* pbuf = NULL;
127 static int pbuf_size = 0;
129 unsigned char* payload;
131 unsigned char* nextr;
134 payload = msg->payload;
135 mlen = msg->len; // usable bytes in the payload
137 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
138 switch( msg->mtype ) {
139 case RMRRM_TABLE_DATA:
140 if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
141 *flags |= RTCFL_HAVE_UPDATE;
142 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
145 if( pbuf_size <= mlen ) {
152 pbuf_size = mlen * 2;
154 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
156 memcpy( pbuf, payload, mlen );
157 pbuf[mlen] = 0; // don't depend on sender making this a legit string
159 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
163 while( curr ) { // loop over each record in the buffer
164 nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
171 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
173 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
178 msg->len = 0; // force back into the listen loop
182 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
188 Route Table Collector
189 A side thread which either attempts to connect and request a table
190 from the Route Manager, or opens a port and listens for Route Manager
191 to push table updates.
193 It may do other things along the way (latency measurements, alarms,
194 respond to RMR pings, etc.).
196 The behaviour with respect to listening for Route Manager updates vs
197 the initiation of the connection and sending a request depends on the
198 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
199 host:port, or IP:port, is given, then we assume that we make the connection
200 and send a request for the table (request mode). If the variable is just
201 a port, then we assume Route Manager will connect and push updates (original
204 If the variable is not defined, the default behaviour, in order to be
205 backwards compatable, depends on the presence of the ENV_CTL_PORT
206 (RMR_CTL_PORT) variable (new with the support for requesting a table).
209 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
210 unset unset Open default CTL port (DEF_CTL_PORT) and
211 wait for Rt Mgr to push tables
213 set unset Use the default Rt Mgr wellknown addr
214 and port (DEF_RTG_WK_ADDR) to connect
215 and request a table. The control port
216 used is the value set by ENV_CTL_PORT.
218 unset set As described above. The default control
219 port (DEF_CTL_PORT) is used.
221 When we are running in request mode, then we will send the RMR message
222 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
223 to send a new table. We will attempt to connect and send requests until
224 we have a table. Calls to rmr_ready() will report FALSE until a table is
225 loaded _unless_ a seed table was given.
227 Route table information is expected to arrive on RMR messages with type
228 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
229 table record, so the payload is as it appears in the seed file or as
230 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
231 to completely supply a new table or table update. See the header for parse_rt_rec
232 in common for a description of possible message contents.
234 Buffers received from the route table generator can contain multiple newline terminated
235 records, but each buffer must be less than 4K in length, and the last record in a
236 buffer may NOT be split across buffers.
239 In addition to the primary task of getting, vetting, and installing a new route table, or
240 updates to the existing table, this thread will periodically cause the send counts for each
241 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
242 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
244 static void* rtc( void* vctx ) {
245 uta_ctx_t* ctx; // context user has -- where we pin the route table
246 uta_ctx_t* pvt_cx; // private context for session with rtg
247 rmr_mbuf_t* msg = NULL; // message from rtg
248 char* my_port; // the port number that we will listen on (4561 has been the default for this)
249 char* rtg_addr; // host:port address of route table generator (route manager)
250 char* daddr; // duplicated rtg address string to parse/trash
251 size_t buf_size; // nng needs var pointer not just size?
253 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
254 int cstate = -1; // connection state to rtg
255 int state; // processing state of some nng function
259 int vfd = -1; // verbose file des if we have one
260 int vlevel = 0; // how chatty we should be 0== no nattering allowed
262 int epfd = -1; // fd for epoll so we can multi-task
263 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
264 struct epoll_event epe; // event definition for event to listen to
265 int count_delay = 30; // number of seconds between writing count info; initially every 30s
266 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
268 int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
269 int nxt_rt_req = 0; // time of next request
272 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
273 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
277 if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) { // master hash table for endpoints (each rt will reference this)
278 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
282 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
283 vfd = open( eptr, O_RDONLY );
284 vlevel = refresh_vlevel( vfd );
287 if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
288 rt_req_freq = atoi( eptr );
289 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
290 rt_req_freq = DEF_RTREQ_FREQ;
291 rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
294 rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
296 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
297 read_static_rt( ctx, vlevel ); // seed the route table if one provided
298 ctx->flags &= ~CFL_NO_RTACK;
301 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
302 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
303 my_port = DEF_CTL_PORT;
304 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
306 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
309 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
313 daddr = strdup( rtg_addr ); // dup to destroy during parse
315 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
317 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
321 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
322 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
326 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
327 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
330 // rtg_addr points at rt mgr address and my port set from env or default stands as is
335 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
336 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
338 while( TRUE ) { // no listen port, just dump counts now and then
339 sleep( count_delay );
340 rt_epcounts( ctx->rtable, ctx->my_name );
348 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
350 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
352 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
353 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
354 if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
355 if( ctx->rtg_whid < 0 ) {
356 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
358 send_update_req( pvt_cx, ctx );
359 nxt_rt_req = time( NULL ) + rt_req_freq;
362 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
364 if( time( NULL ) > blabber ) {
365 vlevel = refresh_vlevel( vfd );
366 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
367 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
368 if( blabber > bump_freq ) {
371 rt_epcounts( ctx->rtable, ctx->my_name );
375 if( ctx->shutdown != 0 ) {
376 break; // mostly for unit test, but allows a forced stop
380 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
382 if( msg != NULL && msg->len > 0 ) {
383 rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
386 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
392 return NULL; // unreachable, but some compilers don't see that and complain.
396 // this is nng specific inas much as we allow raw (non-RMR) messages
399 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
400 from the route manger. It is deprecated in favour of managing all RM-RMR
401 communications via an RMR session.
403 The rtc() function above is the new and preferred function regardless
406 -----------------------------------------------------------------------------------
407 Route Table Collector
408 A side thread which opens a socket and subscribes to a routing table generator.
409 It may do other things along the way (latency measurements?).
411 The pointer is a pointer to the context.
413 Listens for records from the route table generation publisher, expecting
414 one of the following, newline terminated, ASCII records:
415 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
416 new|start // start of new table
417 new|end // end of new table; complete
419 Name must be a host name which can be looked up via gethostbyname() (DNS).
421 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
422 for each message of the type that is sent.
424 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
425 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
427 If multiple groups are given, when send() is called for the cooresponding message type,
428 the message will be sent to one endpoint in each group.
430 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
431 that the entry applies only to the instance running with the hostname 'name.'
433 Buffers received from the route table generator can contain multiple newline terminated
434 records, but each buffer must be less than 4K in length, and the last record in a
435 buffer may NOT be split across buffers.
438 In addition to the primary task of getting, vetting, and installing a new route table, or
439 updates to the existing table, this thread will periodically cause the send counts for each
440 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
441 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
443 static void* raw_rtc( void* vctx ) {
444 uta_ctx_t* ctx; // context user has -- where we pin the route table
445 uta_ctx_t* pvt_cx; // private context for session with rtg
446 rmr_mbuf_t* msg = NULL; // message from rtg
447 char* payload; // payload in the message
449 char* port; // a port number we listen/connect to
450 char* fport; // pointer to the real buffer to free
451 size_t buf_size; // nng needs var pointer not just size?
452 char* nextr; // pointer at next record in the message
453 char* curr; // current record
455 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
456 int cstate = -1; // connection state to rtg
457 int state; // processing state of some nng function
461 int pbuf_size = 0; // number allocated in pbuf
463 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
464 int vfd = -1; // verbose file des if we have one
465 int vlevel = 0; // how chatty we should be 0== no nattering allowed
467 int epfd = -1; // fd for epoll so we can multi-task
468 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
469 struct epoll_event epe; // event definition for event to listen to
470 int rcv_fd = -1; // pollable file des from NNG to use for timeout
471 int count_delay = 30; // number of seconds between writing count info; initially every 30s
472 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
475 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
476 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
480 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
481 vfd = open( eptr, O_RDONLY );
482 vlevel = refresh_vlevel( vfd );
485 read_static_rt( ctx, vlevel ); // seed the route table if one provided
487 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
488 port = strdup( DEF_RTG_PORT );
490 port = strdup( port );
494 this test is now done in init and this function is started _only_ if the value was 1
495 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
496 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
500 fport = port; // must hold to free
502 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
505 port = tokens[0]; // just the port
509 port = tokens[1]; // tcp:port or :port
513 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
517 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
518 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
520 while( TRUE ) { // no listen port, just dump counts now and then
521 sleep( count_delay );
522 rt_epcounts( ctx->rtable, ctx->my_name );
525 free( fport ); // parinoid free and return
529 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
531 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
533 if( (epfd = epoll_create1( 0 )) < 0 ) {
534 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
537 epe.events = EPOLLIN;
538 epe.data.fd = rcv_fd;
540 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
541 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
548 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
551 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
553 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
555 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
556 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
557 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
558 if( raw_interface ) {
559 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
561 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
563 } else { // no msg, do extra tasks
564 if( msg != NULL ) { // if we were working with a message; ensure no len
566 msg->state = RMR_ERR_TIMEOUT;
570 if( time( NULL ) > blabber ) {
571 vlevel = refresh_vlevel( vfd );
572 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
573 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
574 if( blabber > bump_freq ) {
577 rt_epcounts( ctx->rtable, ctx->my_name );
582 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
584 if( msg != NULL && msg->len > 0 ) {
585 payload = msg->payload;
586 mlen = msg->len; // usable bytes in the payload
588 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
590 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
593 if( pbuf_size <= mlen ) {
600 pbuf_size = mlen * 2;
602 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
604 memcpy( pbuf, payload, mlen );
605 pbuf[mlen] = 0; // don't depend on sender making this a legit string
608 while( curr ) { // loop over each record in the buffer
609 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
616 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
618 if( raw_interface ) {
619 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
621 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
627 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
631 msg->len = 0; // force back into the listen loop
635 return NULL; // unreachable, but some compilers don't see that and complain.