Merge "Added new message type constants"
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 /*
46         Route Table Collector
47         A side thread which opens a socket and subscribes to a routing table generator.
48         It may do other things along the way (latency measurements?).
49
50         The pointer is a pointer to the context.
51
52         Listens for records from the route table generation publisher, expecting
53         one of the following, newline terminated, ASCII records:
54                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
55                 new|start                                                               // start of new table
56                 new|end                                                                 // end of new table; complete
57
58                 Name must be a host name which can be looked up via gethostbyname() (DNS).
59
60                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
61                         for each message of the type that is sent.
62
63                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
64                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
65
66                 If multiple groups are given, when send() is called for the cooresponding message type,
67                 the message will be sent to one endpoint in each group.
68
69                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
70                 that the entry applies only to the instance running with the hostname 'name.'
71
72         Buffers received from the route table generator can contain multiple newline terminated
73         records, but each buffer must be less than 4K in length, and the last record in a
74         buffere may NOT be split across buffers.
75
76 */
77 static void* rtc( void* vctx ) {
78         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
79         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
80         rmr_mbuf_t*     msg = NULL;                             // message from rtg
81         char*   payload;                                        // payload in the message
82         size_t  mlen;
83         size_t  clen;                                           // length to copy and mark
84         char*   port;                                           // a port number we listen/connect to
85         char*   fport;                                          // pointer to the real buffer to free
86         size_t  buf_size;                                       // nng needs var pointer not just size?
87         char*   nextr;                                          // pointer at next record in the message
88         char*   curr;                                           // current record
89         int     i;
90         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
91         int             cstate = -1;                            // connection state to rtg
92         int             state;                                          // processing state of some nng function
93         char*   tokens[128];
94         char    wbuf[128];
95         char*   pbuf = NULL;
96         int             pbuf_size = 0;                          // number allocated in pbuf
97         int             ntoks;
98         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
99         int             vfd = -1;                                       // verbose file des if we have one
100         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
101         char*   eptr;
102
103         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
104                 fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
105                 return NULL;
106         }
107
108         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
109                 vfd = open( eptr, O_RDONLY );
110                 if( vfd >= 0 ) {
111                         wbuf[0] = 0;
112                         lseek( vfd, 0, 0 );
113                         read( vfd, wbuf, 10 );
114                         vlevel = atoi( wbuf );
115                 }
116         }
117
118         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
119
120         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
121                 port = strdup( DEF_RTG_PORT );
122         } else {
123                 port = strdup( port );
124         }
125
126         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
127                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
128         }
129
130         fport = port;           // must hold to free
131
132         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
133         switch( ntoks ) {
134                 case 1:
135                                 port = tokens[0];                       // just the port
136                                 break;
137
138                 case 2:
139                                 port = tokens[1];                       // tcp:port or :port
140                                 break;
141
142                 default:
143                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
144                                 break;
145         }
146
147         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
148                 fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
149                 free( fport );
150                 return NULL;
151         }
152
153         if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
154         free( fport );
155
156         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
157
158         blabber = 0;
159         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
160                 if( raw_interface ) {
161                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
162                 } else {
163                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
164                 }
165
166                 if( vfd >= 0 ) {                                                        // if changed since last go round
167                         wbuf[0] = 0;
168                         lseek( vfd, 0, 0 );
169                         read( vfd, wbuf, 10 );
170                         vlevel = atoi( wbuf );
171                 }
172
173                 if( msg != NULL && msg->len > 0 ) {
174                         payload = msg->payload;
175                         mlen = msg->len;                                        // usable bytes in the payload
176                         if( vlevel > 1 ) {
177                                 fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
178                         } else {
179                                 if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
180                         }
181
182                         if( pbuf_size <= mlen ) {
183                                 if( pbuf ) {
184                                         free( pbuf );
185                                 }
186                                 if( mlen < 512 ) {
187                                         pbuf_size = 512;
188                                 } else {
189                                         pbuf_size = mlen * 2;
190                                 }
191                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
192                         }
193                         memcpy( pbuf, payload, mlen );
194                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
195
196                         curr = pbuf;
197                         while( curr ) {                                                         // loop over each record in the buffer
198                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
199
200                                 if( nextr ) {
201                                         *(nextr++) = 0;
202                                 }
203
204                                 if( vlevel > 1 ) {
205                                         fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
206                                 }
207                                 parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
208
209                                 curr = nextr;
210                         }
211
212                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
213                                 break;
214                         }
215                 } else {
216                         if( time( NULL ) > blabber  ) {
217                                 fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
218                                 blabber = time( NULL ) + 180;                   // limit to 1 every 3 min or so
219                         }
220                 }
221         }
222
223         return NULL;    // unreachable, but some compilers don't see that and complain.
224 }
225
226
227 #endif