Fix SI bug causing core dump sending to closed fd
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtc_si_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtc_si_static.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27                                 This comes from the common src and may be moved back there once
28                                 it is not necessary to support raw sessions (all route table
29                                 gen messages are received over rmr channel).
30
31         Author:         E. Scott Daniels
32         Date:           29 November 2018 (extracted to common 13 March 2019)
33                                 Imported to si base 17 Jan 2020.
34 */
35
36
37 #ifndef _rtc_si_staic_c
38 #define _rtc_si_staic_c
39
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <netdb.h>
43 #include <errno.h>
44 #include <string.h>
45 #include <fcntl.h>
46 #include <sys/types.h>
47 #include <sys/stat.h>
48 #include <unistd.h>
49
50 /*
51         Loop forever (assuming we're running in a pthread reading the static table
52         every minute or so.
53 */
54 static void* rtc_file( void* vctx ) {
55         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
56         char*   eptr;
57         int             vfd = -1;                                       // verbose file des if we have one
58         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
59         char    wbuf[256];
60
61
62         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
63                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
64                 return NULL;
65         }
66
67         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
68                 vfd = open( eptr, O_RDONLY );
69         }
70
71         while( 1 ) {
72                 if( vfd >= 0 ) {
73                         wbuf[0] = 0;
74                         lseek( vfd, 0, 0 );
75                         read( vfd, wbuf, 10 );
76                         vlevel = atoi( wbuf );
77                 }
78
79                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
80
81                 sleep( 60 );
82         }
83
84 }
85
86 static int refresh_vlevel( int vfd ) {
87         int vlevel = 0;
88         char    rbuf[128];
89
90         if( vfd >= 0 ) {                                        // if file is open, read current value
91                 rbuf[0] = 0;
92                 lseek( vfd, 0, 0 );
93                 read( vfd, rbuf, 10 );
94                 vlevel = atoi( rbuf );
95         }
96
97         return vlevel;
98 }
99
100 /*
101         Route Table Collector
102         A side thread which opens a socket and subscribes to a routing table generator.
103         It may do other things along the way (latency measurements?).
104
105         The pointer is a pointer to the context.
106
107         Listens for records from the route table generation publisher, expecting
108         one of the following, newline terminated, ASCII records:
109                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
110                 new|start                                                               // start of new table
111                 new|end                                                                 // end of new table; complete
112
113                 Name must be a host name which can be looked up via gethostbyname() (DNS).
114
115                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
116                         for each message of the type that is sent.
117
118                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
119                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
120
121                 If multiple groups are given, when send() is called for the cooresponding message type,
122                 the message will be sent to one endpoint in each group.
123
124                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
125                 that the entry applies only to the instance running with the hostname 'name.'
126
127         Buffers received from the route table generator can contain multiple newline terminated
128         records, but each buffer must be less than 4K in length, and the last record in a
129         buffer may NOT be split across buffers.
130
131         Other chores:
132         In addition to the primary task of getting, vetting, and installing a new route table, or
133         updates to the existing table, this thread will periodically cause the send counts for each
134         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
135         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
136 */
137 static void* rtc( void* vctx ) {
138         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
139         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
140         rmr_mbuf_t*     msg = NULL;                             // message from rtg
141         char*   payload;                                        // payload in the message
142         size_t  mlen;
143         size_t  clen;                                           // length to copy and mark
144         char*   port;                                           // a port number we listen/connect to
145         char*   fport;                                          // pointer to the real buffer to free
146         size_t  buf_size;                                       // nng needs var pointer not just size?
147         char*   nextr;                                          // pointer at next record in the message
148         char*   curr;                                           // current record
149         int     i;
150         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
151         int             cstate = -1;                            // connection state to rtg
152         int             state;                                          // processing state of some nng function
153         char*   tokens[128];
154         char    wbuf[128];
155         char*   pbuf = NULL;
156         int             pbuf_size = 0;                          // number allocated in pbuf
157         int             ntoks;
158         int             raw_interface = 0;                      // rtg is using raw NNG/Nano not RMr to send updates
159         int             vfd = -1;                                       // verbose file des if we have one
160         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
161         char*   eptr;
162         int             epfd = -1;                                      // fd for epoll so we can multi-task
163         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
164         struct  epoll_event epe;                        // event definition for event to listen to
165         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
166         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
167
168
169         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
170                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
171                 return NULL;
172         }
173
174         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
175                 vfd = open( eptr, O_RDONLY );
176                 vlevel = refresh_vlevel( vfd );
177         }
178
179         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
180
181         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
182                 port = strdup( DEF_RTG_PORT );
183         } else {
184                 port = strdup( port );
185         }
186
187         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
188                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
189         }
190
191         fport = port;           // must hold to free
192
193         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
194         switch( ntoks ) {
195                 case 1:
196                                 port = tokens[0];                       // just the port
197                                 break;
198
199                 case 2:
200                                 port = tokens[1];                       // tcp:port or :port
201                                 break;
202
203                 default:
204                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
205                                 break;
206         }
207
208         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
209                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
210
211                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
212                         sleep( count_delay );
213                         rt_epcounts( ctx->rtable, ctx->my_name );
214                 }
215
216                 free( fport );                                  // parinoid free and return
217                 return NULL;
218         }
219
220         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
221         free( fport );
222
223         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
224
225         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
226         blabber = 0;
227         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
228                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
229                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
230
231                         if( time( NULL ) > blabber  ) {
232                                 vlevel = refresh_vlevel( vfd );
233                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
234                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
235                                         if( blabber > bump_freq ) {
236                                                 count_delay = 300;
237                                         }
238                                         rt_epcounts( ctx->rtable, ctx->my_name );
239                                 }
240                         }
241                 }
242
243                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
244
245                 if( msg != NULL && msg->len > 0 ) {
246                         payload = msg->payload;
247                         mlen = msg->len;                                        // usable bytes in the payload
248                         if( vlevel > 1 ) {
249                                 rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
250                         } else {
251                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
252                         }
253
254                         if( pbuf_size <= mlen ) {
255                                 if( pbuf ) {
256                                         free( pbuf );
257                                 }
258                                 if( mlen < 512 ) {
259                                         pbuf_size = 512;
260                                 } else {
261                                         pbuf_size = mlen * 2;
262                                 }
263                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
264                         }
265                         memcpy( pbuf, payload, mlen );
266                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
267
268                         curr = pbuf;
269                         while( curr ) {                                                         // loop over each record in the buffer
270                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
271
272                                 if( nextr ) {
273                                         *(nextr++) = 0;
274                                 }
275
276                                 if( vlevel > 1 ) {
277                                         rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
278                                 }
279                                 parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
280
281                                 curr = nextr;
282                         }
283
284                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
285                                 break;
286                         }
287
288                         msg->len = 0;                           // force back into the listen loop
289                 }
290         }
291
292         return NULL;    // unreachable, but some compilers don't see that and complain.
293 }
294
295
296 #endif