Allow user programme to set RMR verbosity level
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56
57         In order to support multi-threaded user applications we must hold a lock before
58         we attempt to create a dialer and connect. NNG is thread safe, but we can
59         get things into a bad state if we allow a collision here.  The lock grab
60         only happens on the intial session setup.
61 */
62 static int uta_link2( endpoint_t* ep ) {
63         static int      flags = -1;
64
65         char*           target;
66         nng_socket*     nn_sock;
67         nng_dialer*     dialer;
68         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
69         char*           addr;
70         int                     state = FALSE;
71         char*           tok;
72
73         if( ep == NULL ) {
74                 return FALSE;
75         }
76
77         if( flags < 0 ) {
78                 tok = getenv( "RMR_ASYNC_CONN" );
79                 if( tok == NULL || *tok == '1' ) {
80                         flags = NNG_FLAG_NONBLOCK;                              // start dialer asynch
81                 } else {
82                         flags = NO_FLAGS;
83                 }
84         }
85
86         target = ep->name;                              // always give name to transport so chaning dest IP does not break reconnect
87         nn_sock = &ep->nn_sock;
88         dialer = &ep->dialer;
89
90         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
91                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
92                 return FALSE;
93         }
94
95         if( nn_sock == NULL ) {
96                 errno = EINVAL;
97                 return FALSE;
98         }
99
100         pthread_mutex_lock( &ep->gate );                        // grab the lock
101         if( ep->open ) {
102                 pthread_mutex_unlock( &ep->gate );
103                 return TRUE;
104         }
105
106
107         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
108                 pthread_mutex_unlock( &ep->gate );
109                 rmr_vlog( RMR_VL_CRIT, "rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
110                 return FALSE;
111         }
112
113         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
114         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
115                 pthread_mutex_unlock( &ep->gate );
116                 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
117                 nng_close( *nn_sock );
118                 return FALSE;
119         }
120
121         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
122         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
123
124         if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
125                 pthread_mutex_unlock( &ep->gate );
126                 rmr_vlog( RMR_VL_WARN, "rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
127                 nng_close( *nn_sock );
128                 return FALSE;
129         }
130
131         if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_link2l: dial was successful: %s\n", target );
132
133         ep->open = TRUE;                                                // must set before release
134         pthread_mutex_unlock( &ep->gate );
135         return TRUE;
136 }
137
138 /*
139         This provides a protocol independent mechanism for establishing the connection to an endpoint.
140         Return is true (1) if the link was opened; false on error.
141
142         For some flavours, the context is needed by this function, but not for nng.
143 */
144 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
145         if( ep == NULL ) {
146                 return FALSE;
147         }
148
149         if( ep->open )  {                       // already open, do nothing
150                 return TRUE;
151         }
152
153         uta_link2( ep );
154         return ep->open;
155 }
156
157
158 /*
159         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
160         hash we add it and create the endpoint struct.
161
162         The caller must supply the specific route table (we assume it will be pending, but they
163         could live on the edge and update the active one, though that's not at all a good idea).
164 */
165 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
166         endpoint_t*     ep;
167         rrgroup_t* rrg;                         // pointer at group to update
168
169         if( ! rte || ! rt ) {
170                 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
171                 return NULL;
172         }
173
174         if( rte->nrrgroups <= group ) {
175                 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
176                 return NULL;
177         }
178
179         if( (rrg = rte->rrgroups[group]) == NULL ) {
180                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
181                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
182                         return NULL;
183                 }
184                 memset( rrg, 0, sizeof( *rrg ) );
185
186                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
187                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
188                         return NULL;
189                 }
190                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
191
192                 rte->rrgroups[group] = rrg;
193
194                 rrg->ep_idx = 0;                                                // next endpoint to send to
195                 rrg->nused = 0;                                                 // number populated
196                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
197         }
198
199         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
200
201         if( rrg != NULL ) {
202                 if( rrg->nused >= rrg->nendpts ) {
203                         // future: reallocate
204                         rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
205                         return NULL;
206                 }
207
208                 rrg->epts[rrg->nused] = ep;
209                 rrg->nused++;
210         }
211
212         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
213         return ep;
214 }
215
216
217 /*
218         Given a name, find the nano socket needed to send to it. Returns the socket via
219         the user pointer passed in and sets the return value to true (1). If the
220         endpoint cannot be found false (0) is returned.
221 */
222 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
223         endpoint_t* ep;
224         int state = FALSE;
225
226         if( rt == NULL ) {
227                 return FALSE;
228         }
229
230         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
231         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
232                 *uepp = ep;
233         }
234         if( ep == NULL ) {
235                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
236                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
237                         return FALSE;
238                 }
239         }
240
241         if( ! ep->open )  {                                                                             // not open -- connect now
242                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
243                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
244                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
245                 }
246                 if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
247                         state = TRUE;
248                         ep->open = TRUE;
249                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
250                 }
251                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
252         } else {
253                 *nn_sock = ep->nn_sock;
254                 state = TRUE;
255         }
256
257         return state;
258 }
259
260 /*
261         Make a round robin selection within a round robin group for a route table
262         entry. Returns the nanomsg socket if there is a rte for the message
263         key, and group is defined. Socket is returned via pointer in the parm
264         list (nn_sock).
265
266         The group is the group number to select from.
267
268         The user supplied (via pointer to) integer 'more' will be set if there are
269         additional groups beyond the one selected. This allows the caller to
270         to easily iterate over the group list -- more is set when the group should
271         be incremented and the function invoked again. Groups start at 0.
272
273         The return value is true (>0) if the socket was found and *nn_sock was updated
274         and false (0) if there is no associated socket for the msg type, group combination.
275         We return the index+1 from the round robin table on success so that we can verify
276         during test that different entries are being seleted; we cannot depend on the nng
277         socket being different as we could with nano.
278
279         NOTE:   The round robin selection index increment might collide with other
280                 threads if multiple threads are attempting to send to the same round
281                 robin group; the consequences are small and avoid locking. The only side
282                 effect is either sending two messages in a row to, or skipping, an endpoint.
283                 Both of these, in the grand scheme of things, is minor compared to the
284                 overhead of grabbing a lock on each call.
285 */
286 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
287         endpoint_t*     ep;                             // seected end point
288         int  state = FALSE;                     // processing state
289         int dummy;
290         rrgroup_t* rrg;
291         int     idx;
292
293
294         if( ! more ) {                          // eliminate cheks each time we need to use
295                 more = &dummy;
296         }
297
298         if( ! nn_sock ) {                       // user didn't supply a pointer
299                 errno = EINVAL;
300                 *more = 0;
301                 return FALSE;
302         }
303
304         if( rte == NULL ) {
305                 *more = 0;
306                 return FALSE;
307         }
308
309         if( group < 0 || group >= rte->nrrgroups ) {
310                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
311                 *more = 0;
312                 return FALSE;
313         }
314
315         if( (rrg = rte->rrgroups[group]) == NULL ) {
316                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
317                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
318                 return FALSE;
319         }
320
321         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
322
323         switch( rrg->nused ) {
324                 case 0:                         // nothing allocated, just punt
325                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
326                         return FALSE;
327
328                 case 1:                         // exactly one, no rr to deal with
329                         ep = rrg->epts[0];
330                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
331                         state = TRUE;
332                         break;
333
334                 default:                                                                                // need to pick one and adjust rr counts
335
336                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
337                         ep = rrg->epts[idx];                                            // select next endpoint
338                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
339                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
340                         break;
341         }
342
343         if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
344                 *uepp = ep;
345         }
346         if( state ) {                                                                           // end point selected, open if not, get socket either way
347                 if( ! ep->open ) {                                                              // not connected
348                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
349                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
350                         }
351
352                         if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
353                                 ep->open = TRUE;
354                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
355                         } else {
356                                 state = FALSE;
357                         }
358                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
359                 } else {
360                         *nn_sock = ep->nn_sock;
361                 }
362         }
363
364         return state;
365 }
366
367 /*
368         Finds the rtable entry which matches the key. Returns a nil pointer if
369         no entry is found. If try_alternate is set, then we will attempt 
370         to find the entry with a key based only on the message type.
371 */
372 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
373         uint64_t key;                   // key is sub id and mtype banged together
374         rtable_ent_t* rte;              // the entry we found
375
376         if( rt == NULL || rt->hash == NULL ) {
377                 return NULL;
378         }
379
380         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
381         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
382                 return rte;
383         }
384
385         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
386                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
387                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
388         }
389
390         return rte;
391 }
392
393 /*
394         Given a route table and meid string, find the owner (if known). Returns a pointer to
395         the endpoint struct or nil.
396 */
397 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
398         endpoint_t* ep;         // the ep we found in the hash
399
400         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
401                 return NULL;
402         }
403
404         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
405 }
406
407 /*
408         Return a string of count information. E.g.:
409                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
410
411         Caller must free the string allocated if a buffer was not provided.
412
413         Pointer returned is to a freshly allocated string, or the user buffer
414         for convenience.
415
416         If the endpoint passed is a nil pointer, then we return a nil -- caller
417         must check!
418 */
419 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
420         char*   rs;                     // result string
421
422         if( ep == NULL ) {
423                 return NULL;
424         }
425
426         if( ubuf != NULL ) {
427                 rs = ubuf;
428         } else {
429                 ubuf_len = 256;
430                 rs = malloc( sizeof( char ) * ubuf_len );
431         }
432
433         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
434
435         return rs;
436 }
437
438 /*
439         Given a message, use the meid field to find the owner endpoint for the meid.
440         The owner ep is then used to extract the socket through which the message
441         is sent. This returns TRUE if we found a socket and it was written to the
442         nn_sock pointer; false if we didn't.
443
444         We've been told that the meid is a string, thus we count on it being a nil
445         terminated set of bytes.
446 */
447 static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) {
448         endpoint_t*     ep;                             // seected end point
449         int     state = FALSE;                  // processing state
450         char*   meid;
451
452
453         errno = 0;
454         if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
455                 errno = EINVAL;
456                 return FALSE;
457         }
458
459         meid = ((uta_mhdr_t *) msg->header)->meid;
460
461         if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
462                 if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
463                         *uepp = NULL;
464                 }
465
466                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
467                 return FALSE;
468         }
469
470         state = TRUE;
471         if( ! ep->open ) {                                                              // not connected
472                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
473                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
474                 }
475
476                 if( uta_link2( ep ) ) {                                         // find entry in table and create link
477                         ep->open = TRUE;
478                         *nn_sock = ep->nn_sock;                                 // pass socket back to caller
479                 } else {
480                         state = FALSE;
481                 }
482                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
483         } else {
484                 *nn_sock = ep->nn_sock;
485         }
486
487         return state;
488 }
489
490 #endif