6215176a87d3dc47223c0d76bf77fc43702b013d
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_si_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Mark an endpoint closed because it's in a failing state.
53 */
54 static void uta_ep_failed( endpoint_t* ep ) {
55         if( ep != NULL ) {
56                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
57                 ep->open = FALSE;
58         }
59 }
60
61 /*
62         Establish a TCP connection to the indicated target (IP address).
63         Target assumed to be address:port.  The new socket is returned via the
64         user supplied pointer so that a success/fail code is returned directly.
65         Return value is 0 (false) on failure, 1 (true)  on success.
66
67         In order to support multi-threaded user applications we must hold a lock before
68         we attempt to create a dialer and connect. NNG is thread safe, but we can
69         get things into a bad state if we allow a collision here.  The lock grab
70         only happens on the intial session setup.
71 */
72 static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
73         static int      flags = 0;
74
75         char*           target;
76         char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
77         char*           addr;
78         int                     state = FALSE;
79         char*           tok;
80
81         if( ep == NULL ) {
82                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
83                 return FALSE;
84         }
85
86         target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
87         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
88                 if( ep->notify ) {
89                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
90                         ep->notify = 0;
91                 }
92                 return FALSE;
93         }
94
95         pthread_mutex_lock( &ep->gate );                        // grab the lock
96         if( ep->open ) {
97                 pthread_mutex_unlock( &ep->gate );
98                 return TRUE;
99         }
100
101         snprintf( conn_info, sizeof( conn_info ), "%s", target );
102         errno = 0;
103         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104         if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
105                 pthread_mutex_unlock( &ep->gate );
106
107                 if( ep->notify ) {                                                      // need to notify if set
108                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
109                         ep->notify = 0;
110                 }
111                 return FALSE;
112         }
113
114         if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
115
116         ep->open = TRUE;                                                // set open/notify before giving up lock
117
118         if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
119                 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
120                 ep->notify = 1;
121         }
122
123         pthread_mutex_unlock( &ep->gate );
124         return TRUE;
125 }
126
127 /*
128         This provides a protocol independent mechanism for establishing the connection to an endpoint.
129         Return is true (1) if the link was opened; false on error.
130 */
131 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
132         uta_ctx_t* ctx;
133
134         if( ep == NULL ) {
135                 return FALSE;
136         }
137
138         if( ep->open )  {                       // already open, do nothing
139                 return TRUE;
140         }
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return FALSE;
144         }
145
146         uta_link2( ctx->si_ctx, ep );
147         if( ep->open ) {
148                 fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
149         }
150         return ep->open;
151 }
152
153
154 /*
155         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
156         hash we add it and create the endpoint struct.
157
158         The caller must supply the specific route table (we assume it will be pending, but they
159         could live on the edge and update the active one, though that's not at all a good idea).
160 */
161 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
162         endpoint_t*     ep;
163         rrgroup_t* rrg;                         // pointer at group to update
164
165         if( ! rte || ! rt ) {
166                 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
167                 return NULL;
168         }
169
170         if( rte->nrrgroups <= group || group < 0 ) {
171                 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
172                 return NULL;
173         }
174
175         //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
176         if( (rrg = rte->rrgroups[group]) == NULL ) {
177                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
178                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
179                         return NULL;
180                 }
181                 memset( rrg, 0, sizeof( *rrg ) );
182
183                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
184                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
185                         return NULL;
186                 }
187                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
188
189                 rte->rrgroups[group] = rrg;
190                 //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
191
192                 rrg->ep_idx = 0;                                                // next endpoint to send to
193                 rrg->nused = 0;                                                 // number populated
194                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
195
196                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
197         }
198
199         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
200
201         if( rrg != NULL ) {
202                 if( rrg->nused >= rrg->nendpts ) {
203                         // future: reallocate
204                         rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
205                         return NULL;
206                 }
207
208                 rrg->epts[rrg->nused] = ep;
209                 rrg->nused++;
210         }
211
212         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
213         return ep;
214 }
215
216
217 /*
218         Given a name, find the nano socket needed to send to it. Returns the socket via
219         the user pointer passed in and sets the return value to true (1). If the
220         endpoint cannot be found false (0) is returned.
221 */
222 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
223         route_table_t*  rt; 
224         si_ctx_t*               si_ctx;
225         endpoint_t*             ep;
226         int                             state = FALSE;
227
228         if( PARINOID_CHECKS ) {
229                 if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
230                         return FALSE;
231                 }
232         } else {
233                 rt = ctx->rtable;                               // faster but more risky
234                 si_ctx = ctx->si_ctx;
235         }
236
237         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
238         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
239                 *uepp = ep;
240         }
241         if( ep == NULL ) {
242                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
243                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
244                         return FALSE;
245                 }
246         }
247
248         if( ! ep->open )  {                                                                             // not open -- connect now
249                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
250                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
251                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
252                 }
253                 if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
254                         state = TRUE;
255                         ep->open = TRUE;
256                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
257                         fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
258                 }
259                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
260         } else {
261                 *nn_sock = ep->nn_sock;
262                 state = TRUE;
263         }
264
265         return state;
266 }
267
268 /*
269         Make a round robin selection within a round robin group for a route table
270         entry. Returns the nanomsg socket if there is a rte for the message
271         key, and group is defined. Socket is returned via pointer in the parm
272         list (nn_sock).
273
274         The group is the group number to select from.
275
276         The user supplied (via pointer to) integer 'more' will be set if there are
277         additional groups beyond the one selected. This allows the caller to
278         to easily iterate over the group list -- more is set when the group should
279         be incremented and the function invoked again. Groups start at 0.
280
281         The return value is true (>0) if the socket was found and *nn_sock was updated
282         and false (0) if there is no associated socket for the msg type, group combination.
283         We return the index+1 from the round robin table on success so that we can verify
284         during test that different entries are being seleted; we cannot depend on the nng
285         socket being different as we could with nano.
286
287         NOTE:   The round robin selection index increment might collide with other
288                 threads if multiple threads are attempting to send to the same round
289                 robin group; the consequences are small and avoid locking. The only side
290                 effect is either sending two messages in a row to, or skipping, an endpoint.
291                 Both of these, in the grand scheme of things, is minor compared to the
292                 overhead of grabbing a lock on each call.
293 */
294 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
295         si_ctx_t*               si_ctx;
296         endpoint_t*     ep;                             // selected end point
297         int  state = FALSE;                     // processing state
298         int dummy;
299         rrgroup_t* rrg;
300         int     idx;
301
302         if( PARINOID_CHECKS ) {
303                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
304                         return FALSE;
305                 }
306         } else {
307                 si_ctx = ctx->si_ctx;
308         }
309
310         //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
311
312         if( ! more ) {                          // eliminate cheks each time we need to use
313                 more = &dummy;
314         }
315
316         if( ! nn_sock ) {                       // user didn't supply a pointer
317                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
318                 errno = EINVAL;
319                 *more = 0;
320                 return FALSE;
321         }
322
323         if( rte == NULL ) {
324                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
325                 *more = 0;
326                 return FALSE;
327         }
328
329         if( group < 0 || group >= rte->nrrgroups ) {
330                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
331                 *more = 0;
332                 return FALSE;
333         }
334
335         if( (rrg = rte->rrgroups[group]) == NULL ) {
336                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
337                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
338                 return FALSE;
339         }
340
341         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
342
343         switch( rrg->nused ) {
344                 case 0:                         // nothing allocated, just punt
345                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
346                         return FALSE;
347
348                 case 1:                         // exactly one, no rr to deal with
349                         ep = rrg->epts[0];
350                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
351                         state = TRUE;
352                         break;
353
354                 default:                                                                                // need to pick one and adjust rr counts
355                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
356                         ep = rrg->epts[idx];                                            // select next endpoint
357                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
358                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
359                         break;
360         }
361
362         if( uepp != NULL ) {                                                            // caller may need refernce to endpoint too; give it if pointer supplied
363                 *uepp = ep;
364         }
365         if( state ) {                                                                           // end point selected, open if not, get socket either way
366                 if( ! ep->open ) {                                                              // not connected
367                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
368                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
369                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
370                         }
371
372                         if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
373                                 ep->open = TRUE;
374                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
375                                 fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
376                         } else {
377                                 state = FALSE;
378                         }
379                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
380                 } else {
381                         *nn_sock = ep->nn_sock;
382                 }
383         }
384
385         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
386         return state;
387 }
388
389 /*
390         Given a message, use the meid field to find the owner endpoint for the meid.
391         The owner ep is then used to extract the socket through which the message
392         is sent. This returns TRUE if we found a socket and it was written to the
393         nn_sock pointer; false if we didn't.
394
395         We've been told that the meid is a string, thus we count on it being a nil
396         terminated set of bytes.
397 */
398 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
399         endpoint_t*     ep;                             // seected end point
400         int     state = FALSE;                  // processing state
401         char*   meid;
402         si_ctx_t*       si_ctx;
403
404         if( PARINOID_CHECKS ) {
405                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
406                         return FALSE;
407                 }
408         } else {
409                 si_ctx = ctx->si_ctx;
410         }
411
412         errno = 0;
413         if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
414                 errno = EINVAL;
415                 return FALSE;
416         }
417
418         meid = ((uta_mhdr_t *) msg->header)->meid;
419
420         if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
421                 if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
422                         *uepp = NULL;
423                 }
424
425                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
426                 return FALSE;
427         }
428
429         state = TRUE;
430         if( ! ep->open ) {                                                              // not connected
431                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
432                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
433                 }
434
435                 if( uta_link2( si_ctx, ep ) ) {                         // find entry in table and create link
436                         ep->open = TRUE;
437                         *nn_sock = ep->nn_sock;                                 // pass socket back to caller
438                 } else {
439                         state = FALSE;
440                 }
441                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
442         } else {
443                 *nn_sock = ep->nn_sock;
444         }
445
446         return state;
447 }
448
449 /*
450         Finds the rtable entry which matches the key. Returns a nil pointer if
451         no entry is found. If try_alternate is set, then we will attempt 
452         to find the entry with a key based only on the message type.
453 */
454 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
455         uint64_t key;                   // key is sub id and mtype banged together
456         rtable_ent_t* rte;              // the entry we found
457
458         if( rt == NULL || rt->hash == NULL ) {
459                 return NULL;
460         }
461
462         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
463         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
464                 return rte;
465         }
466
467         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
468                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
469                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
470         }
471
472         return rte;
473 }
474
475 /*
476         Return a string of count information. E.g.:
477                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
478
479         Caller must free the string allocated if a buffer was not provided.
480
481         Pointer returned is to a freshly allocated string, or the user buffer
482         for convenience.
483
484         If the endpoint passed is a nil pointer, then we return a nil -- caller
485         must check!
486 */
487 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
488         char*   rs;                     // result string
489
490         if( ep == NULL ) {
491                 return NULL;
492         }
493
494         if( ubuf != NULL ) {
495                 rs = ubuf;
496         } else {
497                 ubuf_len = 256;
498                 rs = malloc( sizeof( char ) * ubuf_len );
499         }
500
501         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
502
503         return rs;
504 }
505
506
507 // ---- fd to ep functions --------------------------------------------------------------------------
508
509 /*
510         Create the hash which maps file descriptors to endpoints. We need this
511         to easily mark an endpoint as disconnected when we are notified.
512 */
513 static void fd2ep_init( uta_ctx_t* ctx ) {
514         if( ctx  && ! ctx->fd2ep ) {
515                 ctx->fd2ep = rmr_sym_alloc( 129 );              
516         }
517 }
518
519 /*
520         Add an entry into the fd2ep hash which points to the given ep.
521 */
522 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
523         if( ctx && ctx->fd2ep ) {
524                 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
525         }
526 }
527
528 /*
529         Given a file descriptor fetches the related endpoint from the hash and 
530         deletes the entry from the hash (when we detect a disconnect).  
531 */
532 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
533         endpoint_t* ep = NULL;
534
535         if( ctx && ctx->fd2ep ) {
536                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
537                 if( ep ) {
538                         rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
539                 }
540         }
541
542         return ep;
543 }
544
545 /*
546         Given a file descriptor fetches the related endpoint from the hash.
547         Returns nil if there is no map.
548 */
549 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
550         endpoint_t* ep = NULL;
551
552         if( ctx && ctx->fd2ep ) {
553                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
554         }
555
556         return ep;
557 }
558
559
560 #endif