Add ability to track send counts for an endpoint
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 /*
46         Route Table Collector
47         A side thread which opens a socket and subscribes to a routing table generator.
48         It may do other things along the way (latency measurements?).
49
50         The pointer is a pointer to the context.
51
52         Listens for records from the route table generation publisher, expecting
53         one of the following, newline terminated, ASCII records:
54                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
55                 new|start                                                               // start of new table
56                 new|end                                                                 // end of new table; complete
57
58                 Name must be a host name which can be looked up via gethostbyname() (DNS).
59
60                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
61                         for each message of the type that is sent.
62
63                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
64                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
65
66                 If multiple groups are given, when send() is called for the cooresponding message type,
67                 the message will be sent to one endpoint in each group.
68
69                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
70                 that the entry applies only to the instance running with the hostname 'name.'
71
72         Buffers received from the route table generator can contain multiple newline terminated
73         records, but each buffer must be less than 4K in length, and the last record in a
74         buffere may NOT be split across buffers.
75
76         Other chores:
77         In addition to the primary task of getting, vetting, and installing a new route table, or
78         updates to the existing table, this thread will periodically cause the send counts for each
79         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
80         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
81 */
82 static void* rtc( void* vctx ) {
83         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
84         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
85         rmr_mbuf_t*     msg = NULL;                             // message from rtg
86         char*   payload;                                        // payload in the message
87         size_t  mlen;
88         size_t  clen;                                           // length to copy and mark
89         char*   port;                                           // a port number we listen/connect to
90         char*   fport;                                          // pointer to the real buffer to free
91         size_t  buf_size;                                       // nng needs var pointer not just size?
92         char*   nextr;                                          // pointer at next record in the message
93         char*   curr;                                           // current record
94         int     i;
95         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
96         int             cstate = -1;                            // connection state to rtg
97         int             state;                                          // processing state of some nng function
98         char*   tokens[128];
99         char    wbuf[128];
100         char*   pbuf = NULL;
101         int             pbuf_size = 0;                          // number allocated in pbuf
102         int             ntoks;
103         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
104         int             vfd = -1;                                       // verbose file des if we have one
105         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
106         char*   eptr;
107         int             epfd = -1;                                      // fd for epoll so we can multi-task
108         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
109         struct  epoll_event epe;                        // event definition for event to listen to
110         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
111         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
112         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
113
114
115         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
116                 fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
117                 return NULL;
118         }
119
120         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
121                 vfd = open( eptr, O_RDONLY );
122                 if( vfd >= 0 ) {
123                         wbuf[0] = 0;
124                         lseek( vfd, 0, 0 );
125                         read( vfd, wbuf, 10 );
126                         vlevel = atoi( wbuf );
127                 }
128         }                
129
130         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
131
132         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
133                 port = strdup( DEF_RTG_PORT );
134         } else {
135                 port = strdup( port );
136         }
137
138         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
139                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
140         }
141
142         fport = port;           // must hold to free
143
144         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
145         switch( ntoks ) {
146                 case 1:
147                                 port = tokens[0];                       // just the port
148                                 break;
149
150                 case 2:
151                                 port = tokens[1];                       // tcp:port or :port
152                                 break;
153
154                 default:
155                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
156                                 break;
157         }
158
159         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
160                 fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
161
162                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
163                         sleep( count_delay );
164                         rt_epcounts( ctx->rtable, ctx->my_name );
165                 }
166                 
167                 free( fport );                                  // parinoid free and return
168                 return NULL;
169         }
170
171         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
172                 if( rcv_fd < 0 ) {
173                         fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
174                 } else {
175                         if( (epfd = epoll_create1( 0 )) < 0 ) {
176                                 fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
177                                 rcv_fd = -1;
178                         } else {
179                                 epe.events = EPOLLIN;
180                                 epe.data.fd = rcv_fd;
181
182                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
183                                         fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
184                                         rcv_fd = -1;
185                                 }
186                         }
187                 }
188         }
189
190         if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
191         free( fport );
192
193         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
194
195         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
196         blabber = 0;
197         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
198                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
199                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
200                                 if( raw_interface ) {
201                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
202                                 } else {
203                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
204                                 }
205                         } else {                                                                                                        // no msg, do extra tasks
206                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
207                                         msg->len = 0;
208                                         msg->state = RMR_ERR_TIMEOUT;
209                                 }
210                         }
211
212                         if( time( NULL ) > blabber  ) {
213                                 blabber = time( NULL ) + count_delay;                                   // set next time to blabber, then do so
214                                 if( blabber > bump_freq ) {
215                                         count_delay = 300;
216                                 }
217                                 rt_epcounts( ctx->rtable, ctx->my_name );
218                         }
219                 }
220
221                 if( vfd >= 0 ) {                                                        // if file open, check for change to vlevel
222                         wbuf[0] = 0;
223                         lseek( vfd, 0, 0 );
224                         read( vfd, wbuf, 10 );
225                         vlevel = atoi( wbuf );
226                 }
227
228                 if( msg != NULL && msg->len > 0 ) {
229                         payload = msg->payload;
230                         mlen = msg->len;                                        // usable bytes in the payload
231                         if( vlevel > 1 ) {
232                                 fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
233                         } else {
234                                 if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
235                         }
236
237                         if( pbuf_size <= mlen ) {
238                                 if( pbuf ) {
239                                         free( pbuf );
240                                 }
241                                 if( mlen < 512 ) {
242                                         pbuf_size = 512;
243                                 } else {
244                                         pbuf_size = mlen * 2;
245                                 }
246                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
247                         }
248                         memcpy( pbuf, payload, mlen );
249                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
250
251                         curr = pbuf;
252                         while( curr ) {                                                         // loop over each record in the buffer
253                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
254
255                                 if( nextr ) {
256                                         *(nextr++) = 0;
257                                 }
258
259                                 if( vlevel > 1 ) {
260                                         fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
261                                 }
262                                 parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
263
264                                 curr = nextr;
265                         }
266
267                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
268                                 break;
269                         }
270
271                         msg->len = 0;                           // force back into the listen loop
272                 }
273         }
274
275         return NULL;    // unreachable, but some compilers don't see that and complain.
276 }
277
278
279 #endif