b411dbe4fc958eaa9761690dc92887fb8fb5cc3a
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 static int refresh_vlevel( int vfd ) {
46         int vlevel = 0;
47         char    rbuf[128];
48
49         if( vfd >= 0 ) {                                        // if file is open, read current value
50                 rbuf[0] = 0;
51                 lseek( vfd, 0, 0 );
52                 read( vfd, rbuf, 10 );
53                 vlevel = atoi( rbuf );
54         }
55
56         return vlevel;
57 }
58
59 /*
60         Route Table Collector
61         A side thread which opens a socket and subscribes to a routing table generator.
62         It may do other things along the way (latency measurements?).
63
64         The pointer is a pointer to the context.
65
66         Listens for records from the route table generation publisher, expecting
67         one of the following, newline terminated, ASCII records:
68                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
69                 new|start                                                               // start of new table
70                 new|end                                                                 // end of new table; complete
71
72                 Name must be a host name which can be looked up via gethostbyname() (DNS).
73
74                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
75                         for each message of the type that is sent.
76
77                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
78                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
79
80                 If multiple groups are given, when send() is called for the cooresponding message type,
81                 the message will be sent to one endpoint in each group.
82
83                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
84                 that the entry applies only to the instance running with the hostname 'name.'
85
86         Buffers received from the route table generator can contain multiple newline terminated
87         records, but each buffer must be less than 4K in length, and the last record in a
88         buffer may NOT be split across buffers.
89
90         Other chores:
91         In addition to the primary task of getting, vetting, and installing a new route table, or
92         updates to the existing table, this thread will periodically cause the send counts for each
93         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
94         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
95 */
96 static void* rtc( void* vctx ) {
97         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
98         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
99         rmr_mbuf_t*     msg = NULL;                             // message from rtg
100         char*   payload;                                        // payload in the message
101         size_t  mlen;
102         size_t  clen;                                           // length to copy and mark
103         char*   port;                                           // a port number we listen/connect to
104         char*   fport;                                          // pointer to the real buffer to free
105         size_t  buf_size;                                       // nng needs var pointer not just size?
106         char*   nextr;                                          // pointer at next record in the message
107         char*   curr;                                           // current record
108         int     i;
109         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
110         int             cstate = -1;                            // connection state to rtg
111         int             state;                                          // processing state of some nng function
112         char*   tokens[128];
113         char    wbuf[128];
114         char*   pbuf = NULL;
115         int             pbuf_size = 0;                          // number allocated in pbuf
116         int             ntoks;
117         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
118         int             vfd = -1;                                       // verbose file des if we have one
119         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
120         char*   eptr;
121         int             epfd = -1;                                      // fd for epoll so we can multi-task
122         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
123         struct  epoll_event epe;                        // event definition for event to listen to
124         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
125         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
126         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
127
128
129         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
130                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
131                 return NULL;
132         }
133
134         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
135                 vfd = open( eptr, O_RDONLY );
136                 vlevel = refresh_vlevel( vfd );
137         }                
138
139         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
140
141         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
142                 port = strdup( DEF_RTG_PORT );
143         } else {
144                 port = strdup( port );
145         }
146
147         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
148                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
149         }
150
151         fport = port;           // must hold to free
152
153         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
154         switch( ntoks ) {
155                 case 1:
156                                 port = tokens[0];                       // just the port
157                                 break;
158
159                 case 2:
160                                 port = tokens[1];                       // tcp:port or :port
161                                 break;
162
163                 default:
164                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
165                                 break;
166         }
167
168         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
169                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
170
171                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
172                         sleep( count_delay );
173                         rt_epcounts( ctx->rtable, ctx->my_name );
174                 }
175                 
176                 free( fport );                                  // parinoid free and return
177                 return NULL;
178         }
179
180         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
181                 if( rcv_fd < 0 ) {
182                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
183                 } else {
184                         if( (epfd = epoll_create1( 0 )) < 0 ) {
185                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
186                                 rcv_fd = -1;
187                         } else {
188                                 epe.events = EPOLLIN;
189                                 epe.data.fd = rcv_fd;
190
191                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
192                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
193                                         rcv_fd = -1;
194                                 }
195                         }
196                 }
197         }
198
199         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
200         free( fport );
201
202         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
203
204         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
205         blabber = 0;
206         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
207                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
208                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
209                                 if( raw_interface ) {
210                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
211                                 } else {
212                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
213                                 }
214                         } else {                                                                                                        // no msg, do extra tasks
215                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
216                                         msg->len = 0;
217                                         msg->state = RMR_ERR_TIMEOUT;
218                                 }
219                         }
220
221                         if( time( NULL ) > blabber  ) {
222                                 vlevel = refresh_vlevel( vfd );
223                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
224                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
225                                         if( blabber > bump_freq ) {
226                                                 count_delay = 300;
227                                         }
228                                         rt_epcounts( ctx->rtable, ctx->my_name );
229                                 }
230                         }
231                 }
232
233                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
234
235                 if( msg != NULL && msg->len > 0 ) {
236                         payload = msg->payload;
237                         mlen = msg->len;                                        // usable bytes in the payload
238                         if( vlevel > 1 ) {
239                                 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
240                         } else {
241                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
242                         }
243
244                         if( pbuf_size <= mlen ) {
245                                 if( pbuf ) {
246                                         free( pbuf );
247                                 }
248                                 if( mlen < 512 ) {
249                                         pbuf_size = 512;
250                                 } else {
251                                         pbuf_size = mlen * 2;
252                                 }
253                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
254                         }
255                         memcpy( pbuf, payload, mlen );
256                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
257
258                         curr = pbuf;
259                         while( curr ) {                                                         // loop over each record in the buffer
260                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
261
262                                 if( nextr ) {
263                                         *(nextr++) = 0;
264                                 }
265
266                                 if( vlevel > 1 ) {
267                                         rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
268                                 }
269                                 parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
270
271                                 curr = nextr;
272                         }
273
274                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
275                                 break;
276                         }
277
278                         msg->len = 0;                           // force back into the listen loop
279                 }
280         }
281
282         return NULL;    // unreachable, but some compilers don't see that and complain.
283 }
284
285
286 #endif