1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Loop forever (assuming we're running in a pthread reading the static table
59 static void* rtc_file( void* vctx ) {
60 uta_ctx_t* ctx; // context user has -- where we pin the route table
62 int vfd = -1; // verbose file des if we have one
63 int vlevel = 0; // how chatty we should be 0== no nattering allowed
67 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
72 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73 vfd = open( eptr, O_RDONLY );
76 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
81 read( vfd, wbuf, 10 );
82 vlevel = atoi( wbuf );
85 read_static_rt( ctx, vlevel ); // seed the route table if one provided
87 if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
94 static int refresh_vlevel( int vfd ) {
98 if( vfd >= 0 ) { // if file is open, read current value
101 read( vfd, rbuf, 10 );
102 vlevel = atoi( rbuf );
109 Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
110 records in each message; it is required that the last record in the message be complete (we do not
111 reconstruct records split over multiple messages). For each record, we call the record parser
112 to parse and add the information to the table being built.
114 This function was broken from the main rtc() function in order to be able to unit test it. Without
115 this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
118 To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
119 words, this is not thread safe but it shouldn't need to be.
121 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
122 static unsigned char* pbuf = NULL;
123 static int pbuf_size = 0;
125 unsigned char* payload;
127 unsigned char* nextr;
130 payload = msg->payload;
131 mlen = msg->len; // usable bytes in the payload
133 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
134 switch( msg->mtype ) {
135 case RMRRM_TABLE_DATA:
136 if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
137 *flags |= RTCFL_HAVE_UPDATE;
138 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
141 if( pbuf_size <= mlen ) {
148 pbuf_size = mlen * 2;
150 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
152 memcpy( pbuf, payload, mlen );
153 pbuf[mlen] = 0; // don't depend on sender making this a legit string
155 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
159 while( curr ) { // loop over each record in the buffer
160 nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
167 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
169 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
174 msg->len = 0; // force back into the listen loop
178 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
184 Route Table Collector
185 A side thread which either attempts to connect and request a table
186 from the Route Manager, or opens a port and listens for Route Manager
187 to push table updates.
189 It may do other things along the way (latency measurements, alarms,
190 respond to RMR pings, etc.).
192 The behaviour with respect to listening for Route Manager updates vs
193 the initiation of the connection and sending a request depends on the
194 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
195 host:port, or IP:port, is given, then we assume that we make the connection
196 and send a request for the table (request mode). If the variable is just
197 a port, then we assume Route Manager will connect and push updates (original
200 If the variable is not defined, the default behaviour, in order to be
201 backwards compatable, depends on the presence of the ENV_CTL_PORT
202 (RMR_CTL_PORT) variable (new with the support for requesting a table).
205 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
206 unset unset Open default CTL port (DEF_CTL_PORT) and
207 wait for Rt Mgr to push tables
209 set unset Use the default Rt Mgr wellknown addr
210 and port (DEF_RTG_WK_ADDR) to connect
211 and request a table. The control port
212 used is the value set by ENV_CTL_PORT.
214 unset set As described above. The default control
215 port (DEF_CTL_PORT) is used.
217 When we are running in request mode, then we will send the RMR message
218 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
219 to send a new table. We will attempt to connect and send requests until
220 we have a table. Calls to rmr_ready() will report FALSE until a table is
221 loaded _unless_ a seed table was given.
223 Route table information is expected to arrive on RMR messages with type
224 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
225 table record, so the payload is as it appears in the seed file or as
226 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
227 to completely supply a new table or table update. See the header for parse_rt_rec
228 in common for a description of possible message contents.
230 Buffers received from the route table generator can contain multiple newline terminated
231 records, but each buffer must be less than 4K in length, and the last record in a
232 buffer may NOT be split across buffers.
235 In addition to the primary task of getting, vetting, and installing a new route table, or
236 updates to the existing table, this thread will periodically cause the send counts for each
237 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
238 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
240 static void* rtc( void* vctx ) {
241 uta_ctx_t* ctx; // context user has -- where we pin the route table
242 uta_ctx_t* pvt_cx; // private context for session with rtg
243 rmr_mbuf_t* msg = NULL; // message from rtg
244 char* my_port; // the port number that we will listen on (4561 has been the default for this)
245 char* rtg_addr; // host:port address of route table generator (route manager)
246 char* daddr; // duplicated rtg address string to parse/trash
247 size_t buf_size; // nng needs var pointer not just size?
249 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
250 int cstate = -1; // connection state to rtg
251 int state; // processing state of some nng function
255 int vfd = -1; // verbose file des if we have one
256 int vlevel = 0; // how chatty we should be 0== no nattering allowed
258 int epfd = -1; // fd for epoll so we can multi-task
259 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
260 struct epoll_event epe; // event definition for event to listen to
261 int count_delay = 30; // number of seconds between writing count info; initially every 30s
262 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
264 int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
265 int nxt_rt_req = 0; // time of next request
268 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
269 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
273 if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) { // master hash table for endpoints (each rt will reference this)
274 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
278 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
279 vfd = open( eptr, O_RDONLY );
280 vlevel = refresh_vlevel( vfd );
283 if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
284 rt_req_freq = atoi( eptr );
285 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
286 rt_req_freq = DEF_RTREQ_FREQ;
287 rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
290 rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
292 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
293 read_static_rt( ctx, vlevel ); // seed the route table if one provided
294 ctx->flags &= ~CFL_NO_RTACK;
297 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
298 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
299 my_port = DEF_CTL_PORT;
300 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
302 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
305 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
309 daddr = strdup( rtg_addr ); // dup to destroy during parse
311 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
313 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
317 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
318 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
322 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
323 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
326 // rtg_addr points at rt mgr address and my port set from env or default stands as is
331 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
332 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
334 while( TRUE ) { // no listen port, just dump counts now and then
335 sleep( count_delay );
336 rt_epcounts( ctx->rtable, ctx->my_name );
344 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
346 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
348 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
349 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
350 if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
351 if( ctx->rtg_whid < 0 ) {
352 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
354 send_update_req( pvt_cx, ctx );
355 nxt_rt_req = time( NULL ) + rt_req_freq;
358 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
360 if( time( NULL ) > blabber ) {
361 vlevel = refresh_vlevel( vfd );
362 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
363 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
364 if( blabber > bump_freq ) {
367 rt_epcounts( ctx->rtable, ctx->my_name );
371 if( ctx->shutdown != 0 ) {
372 break; // mostly for unit test, but allows a forced stop
376 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
378 if( msg != NULL && msg->len > 0 ) {
379 rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
382 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
388 return NULL; // unreachable, but some compilers don't see that and complain.
392 // this is nng specific inas much as we allow raw (non-RMR) messages
395 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
396 from the route manger. It is deprecated in favour of managing all RM-RMR
397 communications via an RMR session.
399 The rtc() function above is the new and preferred function regardless
402 -----------------------------------------------------------------------------------
403 Route Table Collector
404 A side thread which opens a socket and subscribes to a routing table generator.
405 It may do other things along the way (latency measurements?).
407 The pointer is a pointer to the context.
409 Listens for records from the route table generation publisher, expecting
410 one of the following, newline terminated, ASCII records:
411 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
412 new|start // start of new table
413 new|end // end of new table; complete
415 Name must be a host name which can be looked up via gethostbyname() (DNS).
417 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
418 for each message of the type that is sent.
420 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
421 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
423 If multiple groups are given, when send() is called for the cooresponding message type,
424 the message will be sent to one endpoint in each group.
426 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
427 that the entry applies only to the instance running with the hostname 'name.'
429 Buffers received from the route table generator can contain multiple newline terminated
430 records, but each buffer must be less than 4K in length, and the last record in a
431 buffer may NOT be split across buffers.
434 In addition to the primary task of getting, vetting, and installing a new route table, or
435 updates to the existing table, this thread will periodically cause the send counts for each
436 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
437 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
439 static void* raw_rtc( void* vctx ) {
440 uta_ctx_t* ctx; // context user has -- where we pin the route table
441 uta_ctx_t* pvt_cx; // private context for session with rtg
442 rmr_mbuf_t* msg = NULL; // message from rtg
443 char* payload; // payload in the message
445 char* port; // a port number we listen/connect to
446 char* fport; // pointer to the real buffer to free
447 size_t buf_size; // nng needs var pointer not just size?
448 char* nextr; // pointer at next record in the message
449 char* curr; // current record
451 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
452 int cstate = -1; // connection state to rtg
453 int state; // processing state of some nng function
457 int pbuf_size = 0; // number allocated in pbuf
459 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
460 int vfd = -1; // verbose file des if we have one
461 int vlevel = 0; // how chatty we should be 0== no nattering allowed
463 int epfd = -1; // fd for epoll so we can multi-task
464 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
465 struct epoll_event epe; // event definition for event to listen to
466 int rcv_fd = -1; // pollable file des from NNG to use for timeout
467 int count_delay = 30; // number of seconds between writing count info; initially every 30s
468 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
471 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
472 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
476 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
477 vfd = open( eptr, O_RDONLY );
478 vlevel = refresh_vlevel( vfd );
481 read_static_rt( ctx, vlevel ); // seed the route table if one provided
483 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
484 port = strdup( DEF_RTG_PORT );
486 port = strdup( port );
490 this test is now done in init and this function is started _only_ if the value was 1
491 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
492 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
496 fport = port; // must hold to free
498 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
501 port = tokens[0]; // just the port
505 port = tokens[1]; // tcp:port or :port
509 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
513 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
514 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
516 while( TRUE ) { // no listen port, just dump counts now and then
517 sleep( count_delay );
518 rt_epcounts( ctx->rtable, ctx->my_name );
521 free( fport ); // parinoid free and return
525 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
527 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
529 if( (epfd = epoll_create1( 0 )) < 0 ) {
530 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
533 epe.events = EPOLLIN;
534 epe.data.fd = rcv_fd;
536 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
537 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
544 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
547 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
549 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
551 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
552 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
553 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
554 if( raw_interface ) {
555 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
557 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
559 } else { // no msg, do extra tasks
560 if( msg != NULL ) { // if we were working with a message; ensure no len
562 msg->state = RMR_ERR_TIMEOUT;
566 if( time( NULL ) > blabber ) {
567 vlevel = refresh_vlevel( vfd );
568 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
569 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
570 if( blabber > bump_freq ) {
573 rt_epcounts( ctx->rtable, ctx->my_name );
578 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
580 if( msg != NULL && msg->len > 0 ) {
581 payload = msg->payload;
582 mlen = msg->len; // usable bytes in the payload
584 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
586 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
589 if( pbuf_size <= mlen ) {
596 pbuf_size = mlen * 2;
598 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
600 memcpy( pbuf, payload, mlen );
601 pbuf[mlen] = 0; // don't depend on sender making this a legit string
604 while( curr ) { // loop over each record in the buffer
605 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
612 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
614 if( raw_interface ) {
615 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
617 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
623 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
627 msg->len = 0; // force back into the listen loop
631 return NULL; // unreachable, but some compilers don't see that and complain.