1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_si_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Mark an endpoint closed because it's in a failing state.
54 static void uta_ep_failed( endpoint_t* ep ) {
56 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
62 Establish a TCP connection to the indicated target (IP address).
63 Target assumed to be address:port. The new socket is returned via the
64 user supplied pointer so that a success/fail code is returned directly.
65 Return value is 0 (false) on failure, 1 (true) on success.
67 In order to support multi-threaded user applications we must hold a lock before
68 we attempt to create a dialer and connect. NNG is thread safe, but we can
69 get things into a bad state if we allow a collision here. The lock grab
70 only happens on the intial session setup.
72 static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
76 char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
82 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
86 target = ep->name; // always give name to transport so changing dest IP does not break reconnect
87 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
89 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
95 pthread_mutex_lock( &ep->gate ); // grab the lock
97 pthread_mutex_unlock( &ep->gate );
101 snprintf( conn_info, sizeof( conn_info ), "%s", target );
103 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104 if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
105 pthread_mutex_unlock( &ep->gate );
107 if( ep->notify ) { // need to notify if set
108 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) );
111 //nng_close( *nn_sock );
115 if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
117 ep->open = TRUE; // set open/notify before giving up lock
119 if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
120 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
124 pthread_mutex_unlock( &ep->gate );
129 This provides a protocol independent mechanism for establishing the connection to an endpoint.
130 Return is true (1) if the link was opened; false on error.
132 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
139 if( ep->open ) { // already open, do nothing
143 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
147 uta_link2( ctx->si_ctx, ep );
149 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
156 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
157 hash we add it and create the endpoint struct.
159 The caller must supply the specific route table (we assume it will be pending, but they
160 could live on the edge and update the active one, though that's not at all a good idea).
162 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
164 rrgroup_t* rrg; // pointer at group to update
166 if( ! rte || ! rt ) {
167 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
171 if( rte->nrrgroups <= group || group < 0 ) {
172 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
176 //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] );
177 if( (rrg = rte->rrgroups[group]) == NULL ) {
178 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
179 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
182 memset( rrg, 0, sizeof( *rrg ) );
184 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
185 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
188 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
190 rte->rrgroups[group] = rrg;
191 //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] );
193 rrg->ep_idx = 0; // next endpoint to send to
194 rrg->nused = 0; // number populated
195 rrg->nendpts = MAX_EP_GROUP; // number allocated
197 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
200 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
203 if( rrg->nused >= rrg->nendpts ) {
204 // future: reallocate
205 rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
209 rrg->epts[rrg->nused] = ep;
213 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
219 Given a name, find the nano socket needed to send to it. Returns the socket via
220 the user pointer passed in and sets the return value to true (1). If the
221 endpoint cannot be found false (0) is returned.
223 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
229 if( PARINOID_CHECKS ) {
230 if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
234 rt = ctx->rtable; // faster but more risky
235 si_ctx = ctx->si_ctx;
238 ep = rmr_sym_get( rt->hash, ep_name, 1 );
239 if( uepp != NULL ) { // caller needs endpoint too, give it back
243 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
244 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
249 if( ! ep->open ) { // not open -- connect now
250 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
251 if( ep->addr == NULL ) { // name didn't resolve before, try again
252 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
254 if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
257 *nn_sock = ep->nn_sock; // pass socket back to caller
258 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup
260 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
262 *nn_sock = ep->nn_sock;
270 Make a round robin selection within a round robin group for a route table
271 entry. Returns the nanomsg socket if there is a rte for the message
272 key, and group is defined. Socket is returned via pointer in the parm
275 The group is the group number to select from.
277 The user supplied (via pointer to) integer 'more' will be set if there are
278 additional groups beyond the one selected. This allows the caller to
279 to easily iterate over the group list -- more is set when the group should
280 be incremented and the function invoked again. Groups start at 0.
282 The return value is true (>0) if the socket was found and *nn_sock was updated
283 and false (0) if there is no associated socket for the msg type, group combination.
284 We return the index+1 from the round robin table on success so that we can verify
285 during test that different entries are being seleted; we cannot depend on the nng
286 socket being different as we could with nano.
288 NOTE: The round robin selection index increment might collide with other
289 threads if multiple threads are attempting to send to the same round
290 robin group; the consequences are small and avoid locking. The only side
291 effect is either sending two messages in a row to, or skipping, an endpoint.
292 Both of these, in the grand scheme of things, is minor compared to the
293 overhead of grabbing a lock on each call.
295 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
297 endpoint_t* ep; // selected end point
298 int state = FALSE; // processing state
303 if( PARINOID_CHECKS ) {
304 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
308 si_ctx = ctx->si_ctx;
311 //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
313 if( ! more ) { // eliminate cheks each time we need to use
317 if( ! nn_sock ) { // user didn't supply a pointer
318 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
325 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
330 if( group < 0 || group >= rte->nrrgroups ) {
331 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
336 if( (rrg = rte->rrgroups[group]) == NULL ) {
337 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
338 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
342 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
344 switch( rrg->nused ) {
345 case 0: // nothing allocated, just punt
346 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
349 case 1: // exactly one, no rr to deal with
351 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
355 default: // need to pick one and adjust rr counts
356 idx = rrg->ep_idx++ % rrg->nused; // see note above
357 ep = rrg->epts[idx]; // select next endpoint
358 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
359 state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
363 if( uepp != NULL ) { // caller may need refernce to endpoint too; give it if pointer supplied
366 if( state ) { // end point selected, open if not, get socket either way
367 if( ! ep->open ) { // not connected
368 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
369 if( ep->addr == NULL ) { // name didn't resolve before, try again
370 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
373 if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
375 *nn_sock = ep->nn_sock; // pass socket back to caller
376 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
380 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
382 *nn_sock = ep->nn_sock;
386 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
391 Finds the rtable entry which matches the key. Returns a nil pointer if
392 no entry is found. If try_alternate is set, then we will attempt
393 to find the entry with a key based only on the message type.
395 static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
396 uint64_t key; // key is sub id and mtype banged together
397 rtable_ent_t* rte; // the entry we found
399 if( rt == NULL || rt->hash == NULL ) {
403 key = build_rt_key( sid, mtype ); // first try with a 'full' key
404 if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have
408 if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set
409 key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key
410 rte = rmr_sym_pull( rt->hash, key ); // see what we get with this
417 Return a string of count information. E.g.:
418 <ep-name>:<port> <good> <hard-fail> <soft-fail>
420 Caller must free the string allocated if a buffer was not provided.
422 Pointer returned is to a freshly allocated string, or the user buffer
425 If the endpoint passed is a nil pointer, then we return a nil -- caller
428 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
429 char* rs; // result string
439 rs = malloc( sizeof( char ) * ubuf_len );
442 snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
448 // ---- fd to ep functions --------------------------------------------------------------------------
451 Create the hash which maps file descriptors to endpoints. We need this
452 to easily mark an endpoint as disconnected when we are notified.
454 static void fd2ep_init( uta_ctx_t* ctx ) {
455 if( ctx && ! ctx->fd2ep ) {
456 ctx->fd2ep = rmr_sym_alloc( 129 );
461 Add an entry into the fd2ep hash which points to the given ep.
463 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
464 if( ctx && ctx->fd2ep ) {
465 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
470 Given a file descriptor fetches the related endpoint from the hash and
471 deletes the entry from the hash (when we detect a disconnect).
473 static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
474 endpoint_t* ep = NULL;
476 if( ctx && ctx->fd2ep ) {
477 ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
479 rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
487 Given a file descriptor fetches the related endpoint from the hash.
488 Returns nil if there is no map.
490 static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
491 endpoint_t* ep = NULL;
493 if( ctx && ctx->fd2ep ) {
494 ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );