Address potential error state after good send
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56
57         In order to support multi-threaded user applications we must hold a lock before
58         we attempt to create a dialer and connect. NNG is thread safe, but we can
59         get things into a bad state if we allow a collision here.  The lock grab
60         only happens on the intial session setup.
61 */
62 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
63 static int uta_link2( endpoint_t* ep ) {
64         char*           target;
65         nng_socket*     nn_sock;
66         nng_dialer*     dialer;
67         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
68         char*           addr;
69         int                     state = FALSE;
70
71         if( ep == NULL ) {
72                 return FALSE;
73         }
74
75         target = ep->addr;
76         nn_sock = &ep->nn_sock;
77         dialer = &ep->dialer;
78
79         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
80                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
81                 return FALSE;
82         }
83
84         if( nn_sock == NULL ) {
85                 errno = EINVAL;
86                 return FALSE;
87         }
88
89         pthread_mutex_lock( &ep->gate );                        // grab the lock
90         if( ep->open ) {
91                 pthread_mutex_unlock( &ep->gate );
92                 return TRUE;
93         }
94
95
96         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
97                 pthread_mutex_unlock( &ep->gate );
98                 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
99                 return FALSE;
100         }
101
102         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
103         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
104                 pthread_mutex_unlock( &ep->gate );
105                 fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
106                 nng_close( *nn_sock );
107                 return FALSE;
108         }
109
110         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
111         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
112
113         if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
114                 pthread_mutex_unlock( &ep->gate );
115                 fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
116                 nng_close( *nn_sock );
117                 return FALSE;
118         }
119
120         if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
121
122         ep->open = TRUE;                                                // must set before release
123         pthread_mutex_unlock( &ep->gate );
124         return TRUE;
125 }
126
127 /*
128         This provides a protocol independent mechanism for establishing the connection to an endpoint.
129         Return is true (1) if the link was opened; false on error.
130 */
131 static int rt_link2_ep( endpoint_t* ep ) {
132         if( ep == NULL ) {
133                 return FALSE;
134         }
135
136         if( ep->open )  {                       // already open, do nothing
137                 return TRUE;
138         }
139
140         uta_link2( ep );
141         return ep->open;
142 }
143
144
145 /*
146         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
147         hash we add it and create the endpoint struct.
148
149         The caller must supply the specific route table (we assume it will be pending, but they
150         could live on the edge and update the active one, though that's not at all a good idea).
151 */
152 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
153         endpoint_t*     ep;
154         rrgroup_t* rrg;                         // pointer at group to update
155
156         if( ! rte || ! rt ) {
157                 fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
158                 return NULL;
159         }
160
161         if( rte->nrrgroups <= group ) {
162                 fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
163                 return NULL;
164         }
165
166         if( (rrg = rte->rrgroups[group]) == NULL ) {
167                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
168                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
169                         return NULL;
170                 }
171                 memset( rrg, 0, sizeof( *rrg ) );
172
173                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
174                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
175                         return NULL;
176                 }
177                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
178
179                 rte->rrgroups[group] = rrg;
180
181                 rrg->ep_idx = 0;                                                // next endpoint to send to
182                 rrg->nused = 0;                                                 // number populated
183                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
184         }
185
186         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
187
188         if( rrg != NULL ) {
189                 if( rrg->nused >= rrg->nendpts ) {
190                         // future: reallocate
191                         fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
192                         return NULL;
193                 }
194
195                 rrg->epts[rrg->nused] = ep;
196                 rrg->nused++;
197         }
198
199         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
200         return ep;
201 }
202
203
204 /*
205         Given a name, find the nano socket needed to send to it. Returns the socket via
206         the user pointer passed in and sets the return value to true (1). If the
207         endpoint cannot be found false (0) is returned.
208 */
209 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
210         endpoint_t* ep;
211         int state = FALSE;
212
213         if( rt == NULL ) {
214                 return FALSE;
215         }
216
217         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
218         if( ep == NULL ) {
219                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
220                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
221                         return FALSE;
222                 }
223         }
224
225         if( ! ep->open )  {                                                                             // not open -- connect now
226                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
227                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
228                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
229                 }
230                 if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
231                         state = TRUE;
232                         ep->open = TRUE;
233                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
234                 }
235                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
236         } else {
237                 *nn_sock = ep->nn_sock;
238                 state = TRUE;
239         }
240
241         return state;
242 }
243
244 /*
245         Make a round robin selection within a round robin group for a route table
246         entry. Returns the nanomsg socket if there is a rte for the message
247         key, and group is defined. Socket is returned via pointer in the parm
248         list (nn_sock).
249
250         The group is the group number to select from.
251
252         The user supplied (via pointer to) integer 'more' will be set if there are
253         additional groups beyond the one selected. This allows the caller to
254         to easily iterate over the group list -- more is set when the group should
255         be incremented and the function invoked again. Groups start at 0.
256
257         The return value is true (>0) if the socket was found and *nn_sock was updated
258         and false (0) if there is no associated socket for the msg type, group combination.
259         We return the index+1 from the round robin table on success so that we can verify
260         during test that different entries are being seleted; we cannot depend on the nng
261         socket being different as we could with nano.
262
263         NOTE:   The round robin selection index increment might collide with other
264                 threads if multiple threads are attempting to send to the same round
265                 robin group; the consequences are small and avoid locking. The only side
266                 effect is either sending two messages in a row to, or skipping, an endpoint.
267                 Both of these, in the grand scheme of things, is minor compared to the
268                 overhead of grabbing a lock on each call.
269 */
270 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
271         //rtable_ent_t* rte;                    // matching rt entry
272         endpoint_t*     ep;                             // seected end point
273         int  state = FALSE;                     // processing state
274         int dummy;
275         rrgroup_t* rrg;
276         int     idx;
277
278
279         if( ! more ) {                          // eliminate cheks each time we need to use
280                 more = &dummy;
281         }
282
283         if( ! nn_sock ) {                       // user didn't supply a pointer
284                 errno = EINVAL;
285                 *more = 0;
286                 return FALSE;
287         }
288
289         if( rte == NULL ) {
290                 *more = 0;
291                 return FALSE;
292         }
293
294         if( group < 0 || group >= rte->nrrgroups ) {
295                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
296                 *more = 0;
297                 return FALSE;
298         }
299
300         if( (rrg = rte->rrgroups[group]) == NULL ) {
301                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
302                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
303                 return FALSE;
304         }
305
306         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
307
308         switch( rrg->nused ) {
309                 case 0:                         // nothing allocated, just punt
310                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
311                         return FALSE;
312
313                 case 1:                         // exactly one, no rr to deal with
314                         ep = rrg->epts[0];
315                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
316                         state = TRUE;
317                         break;
318
319                 default:                                                                                // need to pick one and adjust rr counts
320
321                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
322                         ep = rrg->epts[idx];                                            // select next endpoint
323                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
324                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
325                         break;
326         }
327
328         if( state ) {                                                                           // end point selected, open if not, get socket either way
329                 if( ! ep->open ) {                                                              // not connected
330                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
331                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
332                         }
333
334                         if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
335                                 ep->open = TRUE;
336                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
337                         } else {
338                                 state = FALSE;
339                         }
340                         if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
341                 } else {
342                         *nn_sock = ep->nn_sock;
343                 }
344         }
345
346         return state;
347 }
348
349 /*
350         Finds the rtable entry which matches the key. Returns a nil pointer if
351         no entry is found. If try_alternate is set, then we will attempt 
352         to find the entry with a key based only on the message type.
353 */
354 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
355         uint64_t key;                   // key is sub id and mtype banged together
356         rtable_ent_t* rte;              // the entry we found
357
358         if( rt == NULL || rt->hash == NULL ) {
359                 return NULL;
360         }
361
362         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
363         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
364                 return rte;
365         }
366
367         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
368                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
369                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
370         }
371
372         return rte;
373 }
374
375 #endif