4f413be6b64a297b9ed1a16bcdba9e86cab11a76
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56
57         In order to support multi-threaded user applications we must hold a lock before
58         we attempt to create a dialer and connect. NNG is thread safe, but we can
59         get things into a bad state if we allow a collision here.  The lock grab
60         only happens on the intial session setup.
61 */
62 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
63 static int uta_link2( endpoint_t* ep ) {
64         static int      flags = -1;
65
66         char*           target;
67         nng_socket*     nn_sock;
68         nng_dialer*     dialer;
69         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
70         char*           addr;
71         int                     state = FALSE;
72         char*           tok;
73
74         if( ep == NULL ) {
75                 return FALSE;
76         }
77
78         if( flags < 0 ) {
79                 tok = getenv( "RMR_ASYNC_CONN" );
80                 if( tok == NULL || *tok == '1' ) {
81                         flags = NNG_FLAG_NONBLOCK;                              // start dialer asynch
82                 } else {
83                         flags = NO_FLAGS;
84                 }
85         }
86
87         target = ep->name;                              // always give name to transport so chaning dest IP does not break reconnect
88         nn_sock = &ep->nn_sock;
89         dialer = &ep->dialer;
90
91         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
92                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
93                 return FALSE;
94         }
95
96         if( nn_sock == NULL ) {
97                 errno = EINVAL;
98                 return FALSE;
99         }
100
101         pthread_mutex_lock( &ep->gate );                        // grab the lock
102         if( ep->open ) {
103                 pthread_mutex_unlock( &ep->gate );
104                 return TRUE;
105         }
106
107
108         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
109                 pthread_mutex_unlock( &ep->gate );
110                 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
111                 return FALSE;
112         }
113
114         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
115         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
116                 pthread_mutex_unlock( &ep->gate );
117                 fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
118                 nng_close( *nn_sock );
119                 return FALSE;
120         }
121
122         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
123         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
124
125         if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
126                 pthread_mutex_unlock( &ep->gate );
127                 fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
128                 nng_close( *nn_sock );
129                 return FALSE;
130         }
131
132         if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
133
134         ep->open = TRUE;                                                // must set before release
135         pthread_mutex_unlock( &ep->gate );
136         return TRUE;
137 }
138
139 /*
140         This provides a protocol independent mechanism for establishing the connection to an endpoint.
141         Return is true (1) if the link was opened; false on error.
142 */
143 static int rt_link2_ep( endpoint_t* ep ) {
144         if( ep == NULL ) {
145                 return FALSE;
146         }
147
148         if( ep->open )  {                       // already open, do nothing
149                 return TRUE;
150         }
151
152         uta_link2( ep );
153         return ep->open;
154 }
155
156
157 /*
158         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
159         hash we add it and create the endpoint struct.
160
161         The caller must supply the specific route table (we assume it will be pending, but they
162         could live on the edge and update the active one, though that's not at all a good idea).
163 */
164 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
165         endpoint_t*     ep;
166         rrgroup_t* rrg;                         // pointer at group to update
167
168         if( ! rte || ! rt ) {
169                 fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
170                 return NULL;
171         }
172
173         if( rte->nrrgroups <= group ) {
174                 fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
175                 return NULL;
176         }
177
178         if( (rrg = rte->rrgroups[group]) == NULL ) {
179                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
180                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
181                         return NULL;
182                 }
183                 memset( rrg, 0, sizeof( *rrg ) );
184
185                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
186                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
187                         return NULL;
188                 }
189                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
190
191                 rte->rrgroups[group] = rrg;
192
193                 rrg->ep_idx = 0;                                                // next endpoint to send to
194                 rrg->nused = 0;                                                 // number populated
195                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
196         }
197
198         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
199
200         if( rrg != NULL ) {
201                 if( rrg->nused >= rrg->nendpts ) {
202                         // future: reallocate
203                         fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
204                         return NULL;
205                 }
206
207                 rrg->epts[rrg->nused] = ep;
208                 rrg->nused++;
209         }
210
211         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
212         return ep;
213 }
214
215
216 /*
217         Given a name, find the nano socket needed to send to it. Returns the socket via
218         the user pointer passed in and sets the return value to true (1). If the
219         endpoint cannot be found false (0) is returned.
220 */
221 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
222         endpoint_t* ep;
223         int state = FALSE;
224
225         if( rt == NULL ) {
226                 return FALSE;
227         }
228
229         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
230         if( ep == NULL ) {
231                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
232                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
233                         return FALSE;
234                 }
235         }
236
237         if( ! ep->open )  {                                                                             // not open -- connect now
238                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
239                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
240                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
241                 }
242                 if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
243                         state = TRUE;
244                         ep->open = TRUE;
245                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
246                 }
247                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
248         } else {
249                 *nn_sock = ep->nn_sock;
250                 state = TRUE;
251         }
252
253         return state;
254 }
255
256 /*
257         Make a round robin selection within a round robin group for a route table
258         entry. Returns the nanomsg socket if there is a rte for the message
259         key, and group is defined. Socket is returned via pointer in the parm
260         list (nn_sock).
261
262         The group is the group number to select from.
263
264         The user supplied (via pointer to) integer 'more' will be set if there are
265         additional groups beyond the one selected. This allows the caller to
266         to easily iterate over the group list -- more is set when the group should
267         be incremented and the function invoked again. Groups start at 0.
268
269         The return value is true (>0) if the socket was found and *nn_sock was updated
270         and false (0) if there is no associated socket for the msg type, group combination.
271         We return the index+1 from the round robin table on success so that we can verify
272         during test that different entries are being seleted; we cannot depend on the nng
273         socket being different as we could with nano.
274
275         NOTE:   The round robin selection index increment might collide with other
276                 threads if multiple threads are attempting to send to the same round
277                 robin group; the consequences are small and avoid locking. The only side
278                 effect is either sending two messages in a row to, or skipping, an endpoint.
279                 Both of these, in the grand scheme of things, is minor compared to the
280                 overhead of grabbing a lock on each call.
281 */
282 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
283         //rtable_ent_t* rte;                    // matching rt entry
284         endpoint_t*     ep;                             // seected end point
285         int  state = FALSE;                     // processing state
286         int dummy;
287         rrgroup_t* rrg;
288         int     idx;
289
290
291         if( ! more ) {                          // eliminate cheks each time we need to use
292                 more = &dummy;
293         }
294
295         if( ! nn_sock ) {                       // user didn't supply a pointer
296                 errno = EINVAL;
297                 *more = 0;
298                 return FALSE;
299         }
300
301         if( rte == NULL ) {
302                 *more = 0;
303                 return FALSE;
304         }
305
306         if( group < 0 || group >= rte->nrrgroups ) {
307                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
308                 *more = 0;
309                 return FALSE;
310         }
311
312         if( (rrg = rte->rrgroups[group]) == NULL ) {
313                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
314                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
315                 return FALSE;
316         }
317
318         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
319
320         switch( rrg->nused ) {
321                 case 0:                         // nothing allocated, just punt
322                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
323                         return FALSE;
324
325                 case 1:                         // exactly one, no rr to deal with
326                         ep = rrg->epts[0];
327                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
328                         state = TRUE;
329                         break;
330
331                 default:                                                                                // need to pick one and adjust rr counts
332
333                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
334                         ep = rrg->epts[idx];                                            // select next endpoint
335                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
336                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
337                         break;
338         }
339
340         if( state ) {                                                                           // end point selected, open if not, get socket either way
341                 if( ! ep->open ) {                                                              // not connected
342                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
343                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
344                         }
345
346                         if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
347                                 ep->open = TRUE;
348                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
349                         } else {
350                                 state = FALSE;
351                         }
352                         if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
353                 } else {
354                         *nn_sock = ep->nn_sock;
355                 }
356         }
357
358         return state;
359 }
360
361 /*
362         Finds the rtable entry which matches the key. Returns a nil pointer if
363         no entry is found. If try_alternate is set, then we will attempt 
364         to find the entry with a key based only on the message type.
365 */
366 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
367         uint64_t key;                   // key is sub id and mtype banged together
368         rtable_ent_t* rte;              // the entry we found
369
370         if( rt == NULL || rt->hash == NULL ) {
371                 return NULL;
372         }
373
374         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
375         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
376                 return rte;
377         }
378
379         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
380                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
381                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
382         }
383
384         return rte;
385 }
386
387 #endif