d6d3490e7a7d560332dd289473dbcfddfb5e60a5
[ric-plt/lib/rmr.git] / src / nng / src / rtable_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56 */
57 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
58         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
59         char*           addr;
60         int                     state = FALSE;
61
62         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
63                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
64                 return FALSE;
65         }
66
67         if( nn_sock == NULL ) {
68                 errno = EINVAL;
69                 return FALSE;
70         }
71
72         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
73                 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
74                 return FALSE;
75         }
76
77         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
78         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
79                 fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
80                 nng_close( *nn_sock );
81                 return FALSE;
82         }
83
84         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
85         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
86
87         if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
88                 fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
89                 nng_close( *nn_sock );
90                 return FALSE;
91         }
92
93         if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
94
95         return TRUE;
96 }
97
98 /*
99         This provides a protocol independent mechanism for establishing the connection to an endpoint.
100         Return is true (1) if the link was opened; false on error.
101 */
102 static int rt_link2_ep( endpoint_t* ep ) {
103         if( ep == NULL ) {
104                 return FALSE;
105         }
106
107         if( ep->open )  {                       // already open, do nothing
108                 return TRUE;
109         }
110
111         ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
112         return ep->open;
113 }
114
115
116 /*
117         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
118         hash we add it and create the endpoint struct.
119
120         The caller must supply the specific route table (we assume it will be pending, but they
121         could live on the edge and update the active one, though that's not at all a good idea).
122 */
123 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
124         endpoint_t*     ep;
125         rrgroup_t* rrg;                         // pointer at group to update
126
127         if( ! rte || ! rt ) {
128                 fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
129                 return NULL;
130         }
131
132         if( rte->nrrgroups <= group ) {
133                 fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
134                 return NULL;
135         }
136
137         if( (rrg = rte->rrgroups[group]) == NULL ) {
138                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
139                         fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
140                         return NULL;
141                 }
142                 memset( rrg, 0, sizeof( *rrg ) );
143
144                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
145                         fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
146                         return NULL;
147                 }
148                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
149
150                 rte->rrgroups[group] = rrg;
151
152                 rrg->ep_idx = 0;                                                // next endpoint to send to
153                 rrg->nused = 0;                                                 // number populated
154                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
155         }
156
157         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
158
159         if( rrg != NULL ) {
160                 if( rrg->nused >= rrg->nendpts ) {
161                         // future: reallocate
162                         fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
163                         return NULL;
164                 }
165
166                 rrg->epts[rrg->nused] = ep;
167                 rrg->nused++;
168         }
169
170         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
171         return ep;
172 }
173
174
175 /*
176         Given a name, find the nano socket needed to send to it. Returns the socket via
177         the user pointer passed in and sets the return value to true (1). If the
178         endpoint cannot be found false (0) is returned.
179 */
180 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
181         endpoint_t* ep;
182         int state = FALSE;
183
184         if( rt == NULL ) {
185                 return FALSE;
186         }
187
188         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
189         if( ep == NULL ) {
190                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
191                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
192                         return FALSE;
193                 }
194         }
195
196         if( ! ep->open )  {                                                                             // not open -- connect now
197                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
198                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
199                         ep->addr = uta_h2ip( ep->name );
200                 }
201                 if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
202                         state = TRUE;
203                         ep->open = TRUE;
204                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
205                 }
206                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
207         } else {
208                 *nn_sock = ep->nn_sock;
209                 state = TRUE;
210         }
211
212         return state;
213 }
214
215 /*
216         Make a round robin selection within a round robin group for a route table
217         entry. Returns the nanomsg socket if there is a rte for the message
218         type, and group is defined. Socket is returned via pointer in the parm
219         list (nn_sock).
220
221         The group is the group number to select from.
222
223         The user supplied (via pointer to) integer 'more' will be set if there are
224         additional groups beyond the one selected. This allows the caller to
225         to easily iterate over the group list -- more is set when the group should
226         be incremented and the function invoked again. Groups start at 0.
227
228         The return value is true (>0) if the socket was found and *nn_sock was updated
229         and false (0) if there is no associated socket for the msg type, group combination.
230         We return the index+1 from the round robin table on success so that we can verify
231         during test that different entries are being seleted; we cannot depend on the nng
232         socket being different as we could with nano.
233 */
234 static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
235         rtable_ent_t* rte;                      // matching rt entry
236         endpoint_t*     ep;                             // seected end point
237         int  state = FALSE;                     // processing state
238         int dummy;
239         rrgroup_t* rrg;
240
241
242         if( ! more ) {                          // eliminate cheks each time we need to user
243                 more = &dummy;
244         }
245
246         if( ! nn_sock ) {                       // user didn't supply a pointer
247                 errno = EINVAL;
248                 *more = 0;
249                 return FALSE;
250         }
251
252         if( rt == NULL ) {
253                 *more = 0;
254                 return FALSE;
255         }
256
257         if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
258                 *more = 0;
259                 //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
260                 return FALSE;
261         }
262
263         if( group < 0 || group >= rte->nrrgroups ) {
264                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
265                 *more = 0;
266                 return FALSE;
267         }
268
269         if( (rrg = rte->rrgroups[group]) == NULL ) {
270                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
271                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
272                 return FALSE;
273         }
274
275         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
276
277         switch( rrg->nused ) {
278                 case 0:                         // nothing allocated, just punt
279                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
280                         return FALSE;
281
282                 case 1:                         // exactly one, no rr to deal with and more is not possible even if fanout > 1
283                         //*nn_sock = rrg->epts[0]->nn_sock;
284                         ep = rrg->epts[0];
285                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
286                         state = TRUE;
287                         break;
288
289                 default:                                                                                // need to pick one and adjust rr counts
290                         ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
291                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
292                         if( rrg->ep_idx >= rrg->nused ) {
293                                 rrg->ep_idx = 0;
294                         }
295                         state = rrg->ep_idx+1;
296                         break;
297         }
298
299         if( state ) {                                                                           // end point selected, open if not, get socket either way
300                 if( ! ep->open ) {                                                              // not connected
301                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
302                                 ep->addr = uta_h2ip( ep->name );
303                         }
304
305                         if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
306                                 ep->open = TRUE;
307                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
308                         } else {
309                                 state = FALSE;
310                         }
311                         if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
312                 } else {
313                         *nn_sock = ep->nn_sock;
314                 }
315         }
316
317         return state;
318 }
319
320 #endif