1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_collector.c
23 Abstract: The route table collector is started as a separate pthread and
24 is responsible for listening for route table updates from a
25 route manager or route table generator process.
27 Author: E. Scott Daniels
28 Date: 29 November 2018 (extracted to common 13 March 2019)
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
41 #include <sys/types.h>
45 #include <RIC_message_types.h> // needed for RMR/Rt Mgr msg types
47 // ---- local constants ------------------
49 #define RTCFL_HAVE_UPDATE 0x01 // an update from RM was received
51 #define MAX_RTC_BUF 5 * 1024 // max buffer size we'll expect is 4k, add some fudge room
53 // ------------------------------------------------------------------------------------------------
56 Loop forever (assuming we're running in a pthread reading the static table
59 static void* rtc_file( void* vctx ) {
60 uta_ctx_t* ctx; // context user has -- where we pin the route table
62 int vfd = -1; // verbose file des if we have one
63 int vlevel = 0; // how chatty we should be 0== no nattering allowed
67 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
72 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73 vfd = open( eptr, O_RDONLY );
76 ctx->flags |= CFL_NO_RTACK; // no attempt to ack when reading from a file
81 if( read( vfd, wbuf, 10 ) > 0 ) {
82 vlevel = atoi( wbuf );
86 read_static_rt( ctx, vlevel ); // seed the route table if one provided
88 if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing
95 static int refresh_vlevel( int vfd ) {
99 if( vfd >= 0 ) { // if file is open, read current value
102 if( read( vfd, rbuf, 10 ) > 0 ) {
103 vlevel = atoi( rbuf );
111 Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
112 records in each message; it is required that the last record in the message be complete (we do not
113 reconstruct records split over multiple messages). For each record, we call the record parser
114 to parse and add the information to the table being built.
116 This function was broken from the main rtc() function in order to be able to unit test it. Without
117 this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
120 To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
121 words, this is not thread safe but it shouldn't need to be.
123 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) {
124 static unsigned char* pbuf = NULL;
125 static int pbuf_size = 0;
127 unsigned char* payload;
129 unsigned char* nextr;
132 payload = msg->payload;
133 mlen = msg->len; // usable bytes in the payload
135 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
136 switch( msg->mtype ) {
137 case RMRRM_TABLE_DATA:
138 if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
139 *flags |= RTCFL_HAVE_UPDATE;
140 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
143 if( pbuf_size <= mlen ) {
150 pbuf_size = mlen * 2;
152 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
154 memcpy( pbuf, payload, mlen );
155 pbuf[mlen] = 0; // don't depend on sender making this a legit string
157 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
161 while( curr ) { // loop over each record in the buffer
162 nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark
169 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
171 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg
176 msg->len = 0; // force back into the listen loop
180 rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
186 Route Table Collector
187 A side thread which either attempts to connect and request a table
188 from the Route Manager, or opens a port and listens for Route Manager
189 to push table updates.
191 It may do other things along the way (latency measurements, alarms,
192 respond to RMR pings, etc.).
194 The behaviour with respect to listening for Route Manager updates vs
195 the initiation of the connection and sending a request depends on the
196 value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If
197 host:port, or IP:port, is given, then we assume that we make the connection
198 and send a request for the table (request mode). If the variable is just
199 a port, then we assume Route Manager will connect and push updates (original
202 If the variable is not defined, the default behaviour, in order to be
203 backwards compatable, depends on the presence of the ENV_CTL_PORT
204 (RMR_CTL_PORT) variable (new with the support for requesting a table).
207 ENV_CTL_PORT ENV_RTG_ADDR Behaviour
208 unset unset Open default CTL port (DEF_CTL_PORT) and
209 wait for Rt Mgr to push tables
211 set unset Use the default Rt Mgr wellknown addr
212 and port (DEF_RTG_WK_ADDR) to connect
213 and request a table. The control port
214 used is the value set by ENV_CTL_PORT.
216 unset set As described above. The default control
217 port (DEF_CTL_PORT) is used.
219 When we are running in request mode, then we will send the RMR message
220 RMRRM_REFRESH to this address (wormhole) as a request for the route manager
221 to send a new table. We will attempt to connect and send requests until
222 we have a table. Calls to rmr_ready() will report FALSE until a table is
223 loaded _unless_ a seed table was given.
225 Route table information is expected to arrive on RMR messages with type
226 RMRRM_TABLE_DATA. There is NOT a specific message type for each possible
227 table record, so the payload is as it appears in the seed file or as
228 delivered in old versions. It may take several RMRRM_TABLE_DATA messages
229 to completely supply a new table or table update. See the header for parse_rt_rec
230 in common for a description of possible message contents.
232 Buffers received from the route table generator can contain multiple newline terminated
233 records, but each buffer must be less than 4K in length, and the last record in a
234 buffer may NOT be split across buffers.
237 In addition to the primary task of getting, vetting, and installing a new route table, or
238 updates to the existing table, this thread will periodically cause the send counts for each
239 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
240 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
242 static void* rtc( void* vctx ) {
243 uta_ctx_t* ctx; // context user has -- where we pin the route table
244 uta_ctx_t* pvt_cx; // private context for session with rtg
245 rmr_mbuf_t* msg = NULL; // message from rtg
246 char* my_port; // the port number that we will listen on (4561 has been the default for this)
247 char* rtg_addr; // host:port address of route table generator (route manager)
248 char* daddr; // duplicated rtg address string to parse/trash
249 size_t buf_size; // nng needs var pointer not just size?
251 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
252 int cstate = -1; // connection state to rtg
253 int state; // processing state of some nng function
257 int vfd = -1; // verbose file des if we have one
258 int vlevel = 0; // how chatty we should be 0== no nattering allowed
260 int epfd = -1; // fd for epoll so we can multi-task
261 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
262 struct epoll_event epe; // event definition for event to listen to
263 int count_delay = 30; // number of seconds between writing count info; initially every 30s
264 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
266 int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table
267 int nxt_rt_req = 0; // time of next request
270 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
271 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
275 if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) { // master hash table for endpoints (each rt will reference this)
276 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
280 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
281 vfd = open( eptr, O_RDONLY );
282 vlevel = refresh_vlevel( vfd );
285 if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
286 rt_req_freq = atoi( eptr );
287 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
288 rt_req_freq = DEF_RTREQ_FREQ;
289 rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
292 rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
294 ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file
295 read_static_rt( ctx, vlevel ); // seed the route table if one provided
296 ctx->flags &= ~CFL_NO_RTACK;
299 my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561)
300 if( my_port == NULL || ! *my_port ) { // if undefined, then go with default
301 my_port = DEF_CTL_PORT;
302 daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen
304 daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr
307 if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) { // undefined, use default set above
311 daddr = strdup( rtg_addr ); // dup to destroy during parse
313 ntoks = uta_tokenise( daddr, tokens, 120, ':' ); // should be host:ip of rt mgr (could be port only which we assume is old listen port)
315 case 0: // should not happen, but prevent accidents and allow default to ignore additional tokens
319 my_port = tokens[0]; // just port -- assume backlevel environment where we just listen
320 flags |= RTCFL_HAVE_UPDATE; // prevent sending update reqests
324 if( strcmp( tokens[0], "tcp" ) == 0 ) { // old school nng tcp:xxxx so we listen on xxx
325 flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update
328 // rtg_addr points at rt mgr address and my port set from env or default stands as is
333 if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
334 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
336 while( TRUE ) { // no listen port, just dump counts now and then
337 sleep( count_delay );
338 rt_epcounts( ctx->rtable, ctx->my_name );
346 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
348 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
350 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
351 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
352 if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one
353 if( ctx->rtg_whid < 0 ) {
354 ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
356 send_update_req( pvt_cx, ctx );
357 nxt_rt_req = time( NULL ) + rt_req_freq;
360 msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
362 if( time( NULL ) > blabber ) {
363 vlevel = refresh_vlevel( vfd );
364 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
365 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
366 if( blabber > bump_freq ) {
369 rt_epcounts( ctx->rtable, ctx->my_name );
373 if( ctx->shutdown != 0 ) {
374 break; // mostly for unit test, but allows a forced stop
378 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
380 if( msg != NULL && msg->len > 0 ) {
381 rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
384 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
390 return NULL; // unreachable, but some compilers don't see that and complain.
394 // this is nng specific inas much as we allow raw (non-RMR) messages
397 NOTE: This is the original rtc code when we supported "raw" nano/nng messages
398 from the route manger. It is deprecated in favour of managing all RM-RMR
399 communications via an RMR session.
401 The rtc() function above is the new and preferred function regardless
404 -----------------------------------------------------------------------------------
405 Route Table Collector
406 A side thread which opens a socket and subscribes to a routing table generator.
407 It may do other things along the way (latency measurements?).
409 The pointer is a pointer to the context.
411 Listens for records from the route table generation publisher, expecting
412 one of the following, newline terminated, ASCII records:
413 rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
414 new|start // start of new table
415 new|end // end of new table; complete
417 Name must be a host name which can be looked up via gethostbyname() (DNS).
419 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
420 for each message of the type that is sent.
422 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
423 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
425 If multiple groups are given, when send() is called for the cooresponding message type,
426 the message will be sent to one endpoint in each group.
428 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
429 that the entry applies only to the instance running with the hostname 'name.'
431 Buffers received from the route table generator can contain multiple newline terminated
432 records, but each buffer must be less than 4K in length, and the last record in a
433 buffer may NOT be split across buffers.
436 In addition to the primary task of getting, vetting, and installing a new route table, or
437 updates to the existing table, this thread will periodically cause the send counts for each
438 endpoint known to be written to standard error. The frequency is once every 180 seconds, and
439 more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
441 static void* raw_rtc( void* vctx ) {
442 uta_ctx_t* ctx; // context user has -- where we pin the route table
443 uta_ctx_t* pvt_cx; // private context for session with rtg
444 rmr_mbuf_t* msg = NULL; // message from rtg
445 char* payload; // payload in the message
447 char* port; // a port number we listen/connect to
448 char* fport; // pointer to the real buffer to free
449 size_t buf_size; // nng needs var pointer not just size?
450 char* nextr; // pointer at next record in the message
451 char* curr; // current record
453 long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
454 int cstate = -1; // connection state to rtg
455 int state; // processing state of some nng function
459 int pbuf_size = 0; // number allocated in pbuf
461 int raw_interface = 1; // rtg is using raw NNG/Nano not RMr to send updates
462 int vfd = -1; // verbose file des if we have one
463 int vlevel = 0; // how chatty we should be 0== no nattering allowed
465 int epfd = -1; // fd for epoll so we can multi-task
466 struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
467 struct epoll_event epe; // event definition for event to listen to
468 int rcv_fd = -1; // pollable file des from NNG to use for timeout
469 int count_delay = 30; // number of seconds between writing count info; initially every 30s
470 int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
473 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
474 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
478 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
479 vfd = open( eptr, O_RDONLY );
480 vlevel = refresh_vlevel( vfd );
483 read_static_rt( ctx, vlevel ); // seed the route table if one provided
485 if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
486 port = strdup( DEF_RTG_PORT );
488 port = strdup( port );
492 this test is now done in init and this function is started _only_ if the value was 1
493 if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
494 raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
498 fport = port; // must hold to free
500 ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
503 port = tokens[0]; // just the port
507 port = tokens[1]; // tcp:port or :port
511 port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
515 if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
516 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
518 while( TRUE ) { // no listen port, just dump counts now and then
519 sleep( count_delay );
520 rt_epcounts( ctx->rtable, ctx->my_name );
523 free( fport ); // parinoid free and return
527 if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
529 rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
531 if( (epfd = epoll_create1( 0 )) < 0 ) {
532 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
535 epe.events = EPOLLIN;
536 epe.data.fd = rcv_fd;
538 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
539 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
546 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
549 // future: if we need to register with the rtg, then build a message and send it through a wormhole here
551 bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
553 while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
554 while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
555 if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
556 if( raw_interface ) {
557 msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
559 msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
561 } else { // no msg, do extra tasks
562 if( msg != NULL ) { // if we were working with a message; ensure no len
564 msg->state = RMR_ERR_TIMEOUT;
568 if( time( NULL ) > blabber ) {
569 vlevel = refresh_vlevel( vfd );
570 if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
571 blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
572 if( blabber > bump_freq ) {
575 rt_epcounts( ctx->rtable, ctx->my_name );
580 vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
582 if( msg != NULL && msg->len > 0 ) {
583 payload = msg->payload;
584 mlen = msg->len; // usable bytes in the payload
586 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
588 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
591 if( pbuf_size <= mlen ) {
598 pbuf_size = mlen * 2;
600 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
602 memcpy( pbuf, payload, mlen );
603 pbuf[mlen] = 0; // don't depend on sender making this a legit string
606 while( curr ) { // loop over each record in the buffer
607 nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
614 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
616 if( raw_interface ) {
617 parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages
619 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table
625 if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
629 msg->len = 0; // force back into the listen loop
633 return NULL; // unreachable, but some compilers don't see that and complain.