Add ability to track send counts for an endpoint
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56
57         In order to support multi-threaded user applications we must hold a lock before
58         we attempt to create a dialer and connect. NNG is thread safe, but we can
59         get things into a bad state if we allow a collision here.  The lock grab
60         only happens on the intial session setup.
61 */
62 static int uta_link2( endpoint_t* ep ) {
63         static int      flags = -1;
64
65         char*           target;
66         nng_socket*     nn_sock;
67         nng_dialer*     dialer;
68         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
69         char*           addr;
70         int                     state = FALSE;
71         char*           tok;
72
73         if( ep == NULL ) {
74                 return FALSE;
75         }
76
77         if( flags < 0 ) {
78                 tok = getenv( "RMR_ASYNC_CONN" );
79                 if( tok == NULL || *tok == '1' ) {
80                         flags = NNG_FLAG_NONBLOCK;                              // start dialer asynch
81                 } else {
82                         flags = NO_FLAGS;
83                 }
84         }
85
86         target = ep->name;                              // always give name to transport so chaning dest IP does not break reconnect
87         nn_sock = &ep->nn_sock;
88         dialer = &ep->dialer;
89
90         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
91                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
92                 return FALSE;
93         }
94
95         if( nn_sock == NULL ) {
96                 errno = EINVAL;
97                 return FALSE;
98         }
99
100         pthread_mutex_lock( &ep->gate );                        // grab the lock
101         if( ep->open ) {
102                 pthread_mutex_unlock( &ep->gate );
103                 return TRUE;
104         }
105
106
107         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
108                 pthread_mutex_unlock( &ep->gate );
109                 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
110                 return FALSE;
111         }
112
113         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
114         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
115                 pthread_mutex_unlock( &ep->gate );
116                 fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
117                 nng_close( *nn_sock );
118                 return FALSE;
119         }
120
121         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
122         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
123
124         if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
125                 pthread_mutex_unlock( &ep->gate );
126                 fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
127                 nng_close( *nn_sock );
128                 return FALSE;
129         }
130
131         if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
132
133         ep->open = TRUE;                                                // must set before release
134         pthread_mutex_unlock( &ep->gate );
135         return TRUE;
136 }
137
138 /*
139         This provides a protocol independent mechanism for establishing the connection to an endpoint.
140         Return is true (1) if the link was opened; false on error.
141 */
142 static int rt_link2_ep( endpoint_t* ep ) {
143         if( ep == NULL ) {
144                 return FALSE;
145         }
146
147         if( ep->open )  {                       // already open, do nothing
148                 return TRUE;
149         }
150
151         uta_link2( ep );
152         return ep->open;
153 }
154
155
156 /*
157         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
158         hash we add it and create the endpoint struct.
159
160         The caller must supply the specific route table (we assume it will be pending, but they
161         could live on the edge and update the active one, though that's not at all a good idea).
162 */
163 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
164         endpoint_t*     ep;
165         rrgroup_t* rrg;                         // pointer at group to update
166
167         if( ! rte || ! rt ) {
168                 fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
169                 return NULL;
170         }
171
172         if( rte->nrrgroups <= group ) {
173                 fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
174                 return NULL;
175         }
176
177         if( (rrg = rte->rrgroups[group]) == NULL ) {
178                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
179                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
180                         return NULL;
181                 }
182                 memset( rrg, 0, sizeof( *rrg ) );
183
184                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
185                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
186                         return NULL;
187                 }
188                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
189
190                 rte->rrgroups[group] = rrg;
191
192                 rrg->ep_idx = 0;                                                // next endpoint to send to
193                 rrg->nused = 0;                                                 // number populated
194                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
195         }
196
197         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
198
199         if( rrg != NULL ) {
200                 if( rrg->nused >= rrg->nendpts ) {
201                         // future: reallocate
202                         fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
203                         return NULL;
204                 }
205
206                 rrg->epts[rrg->nused] = ep;
207                 rrg->nused++;
208         }
209
210         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
211         return ep;
212 }
213
214
215 /*
216         Given a name, find the nano socket needed to send to it. Returns the socket via
217         the user pointer passed in and sets the return value to true (1). If the
218         endpoint cannot be found false (0) is returned.
219 */
220 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
221         endpoint_t* ep;
222         int state = FALSE;
223
224         if( rt == NULL ) {
225                 return FALSE;
226         }
227
228         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
229         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
230                 *uepp = ep;
231         }
232         if( ep == NULL ) {
233                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
234                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
235                         return FALSE;
236                 }
237         }
238
239         if( ! ep->open )  {                                                                             // not open -- connect now
240                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
241                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
242                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
243                 }
244                 if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
245                         state = TRUE;
246                         ep->open = TRUE;
247                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
248                 }
249                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
250         } else {
251                 *nn_sock = ep->nn_sock;
252                 state = TRUE;
253         }
254
255         return state;
256 }
257
258 /*
259         Make a round robin selection within a round robin group for a route table
260         entry. Returns the nanomsg socket if there is a rte for the message
261         key, and group is defined. Socket is returned via pointer in the parm
262         list (nn_sock).
263
264         The group is the group number to select from.
265
266         The user supplied (via pointer to) integer 'more' will be set if there are
267         additional groups beyond the one selected. This allows the caller to
268         to easily iterate over the group list -- more is set when the group should
269         be incremented and the function invoked again. Groups start at 0.
270
271         The return value is true (>0) if the socket was found and *nn_sock was updated
272         and false (0) if there is no associated socket for the msg type, group combination.
273         We return the index+1 from the round robin table on success so that we can verify
274         during test that different entries are being seleted; we cannot depend on the nng
275         socket being different as we could with nano.
276
277         NOTE:   The round robin selection index increment might collide with other
278                 threads if multiple threads are attempting to send to the same round
279                 robin group; the consequences are small and avoid locking. The only side
280                 effect is either sending two messages in a row to, or skipping, an endpoint.
281                 Both of these, in the grand scheme of things, is minor compared to the
282                 overhead of grabbing a lock on each call.
283 */
284 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
285         endpoint_t*     ep;                             // seected end point
286         int  state = FALSE;                     // processing state
287         int dummy;
288         rrgroup_t* rrg;
289         int     idx;
290
291
292         if( ! more ) {                          // eliminate cheks each time we need to use
293                 more = &dummy;
294         }
295
296         if( ! nn_sock ) {                       // user didn't supply a pointer
297                 errno = EINVAL;
298                 *more = 0;
299                 return FALSE;
300         }
301
302         if( rte == NULL ) {
303                 *more = 0;
304                 return FALSE;
305         }
306
307         if( group < 0 || group >= rte->nrrgroups ) {
308                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
309                 *more = 0;
310                 return FALSE;
311         }
312
313         if( (rrg = rte->rrgroups[group]) == NULL ) {
314                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
315                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
316                 return FALSE;
317         }
318
319         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
320
321         switch( rrg->nused ) {
322                 case 0:                         // nothing allocated, just punt
323                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
324                         return FALSE;
325
326                 case 1:                         // exactly one, no rr to deal with
327                         ep = rrg->epts[0];
328                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
329                         state = TRUE;
330                         break;
331
332                 default:                                                                                // need to pick one and adjust rr counts
333
334                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
335                         ep = rrg->epts[idx];                                            // select next endpoint
336                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
337                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
338                         break;
339         }
340
341         if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
342                 *uepp = ep;
343         }
344         if( state ) {                                                                           // end point selected, open if not, get socket either way
345                 if( ! ep->open ) {                                                              // not connected
346                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
347                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
348                         }
349
350                         if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
351                                 ep->open = TRUE;
352                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
353                         } else {
354                                 state = FALSE;
355                         }
356                         if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
357                 } else {
358                         *nn_sock = ep->nn_sock;
359                 }
360         }
361
362         return state;
363 }
364
365 /*
366         Finds the rtable entry which matches the key. Returns a nil pointer if
367         no entry is found. If try_alternate is set, then we will attempt 
368         to find the entry with a key based only on the message type.
369 */
370 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
371         uint64_t key;                   // key is sub id and mtype banged together
372         rtable_ent_t* rte;              // the entry we found
373
374         if( rt == NULL || rt->hash == NULL ) {
375                 return NULL;
376         }
377
378         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
379         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
380                 return rte;
381         }
382
383         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
384                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
385                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
386         }
387
388         return rte;
389 }
390
391 /*
392         Return a string of count information. E.g.:
393                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
394
395         Caller must free the string allocated if a buffer was not provided.
396
397         Pointer returned is to a freshly allocated string, or the user buffer
398         for convenience.
399
400         If the endpoint passed is a nil pointer, then we return a nil -- caller
401         must check!
402 */
403 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
404         char*   rs;                     // result string
405
406         if( ep == NULL ) {
407                 return NULL;
408         }
409
410         if( ubuf != NULL ) {
411                 rs = ubuf;
412         } else {
413                 ubuf_len = 256;
414                 rs = malloc( sizeof( char ) * ubuf_len );
415         }
416
417         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
418
419         return rs;
420 }
421
422 #endif