1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_si_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Mark an endpoint closed because it's in a failing state.
54 static void uta_ep_failed( endpoint_t* ep ) {
56 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
62 Establish a TCP connection to the indicated target (IP address).
63 Target assumed to be address:port. The new socket is returned via the
64 user supplied pointer so that a success/fail code is returned directly.
65 Return value is 0 (false) on failure, 1 (true) on success.
67 In order to support multi-threaded user applications we must hold a lock before
68 we attempt to create a dialer and connect. NNG is thread safe, but we can
69 get things into a bad state if we allow a collision here. The lock grab
70 only happens on the intial session setup.
72 //static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
73 static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
76 char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
82 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
86 target = ep->name; // always give name to transport so changing dest IP does not break reconnect
87 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
89 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
95 pthread_mutex_lock( &ep->gate ); // grab the lock
97 pthread_mutex_unlock( &ep->gate );
101 snprintf( conn_info, sizeof( conn_info ), "%s", target );
103 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104 if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
105 pthread_mutex_unlock( &ep->gate );
107 if( ep->notify ) { // need to notify if set
108 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) );
114 if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
116 ep->open = TRUE; // set open/notify before giving up lock
117 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
119 if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
120 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
124 pthread_mutex_unlock( &ep->gate );
129 This provides a protocol independent mechanism for establishing the connection to an endpoint.
130 Return is true (1) if the link was opened; false on error.
132 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
139 if( ep->open ) { // already open, do nothing
143 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
147 uta_link2( ctx, ep );
153 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
154 hash we add it and create the endpoint struct.
156 The caller must supply the specific route table (we assume it will be pending, but they
157 could live on the edge and update the active one, though that's not at all a good idea).
159 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
161 rrgroup_t* rrg; // pointer at group to update
163 if( ! rte || ! rt ) {
164 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
168 if( rte->nrrgroups <= group || group < 0 ) {
169 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
173 if( (rrg = rte->rrgroups[group]) == NULL ) {
174 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
175 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
178 memset( rrg, 0, sizeof( *rrg ) );
180 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t* ) * MAX_EP_GROUP )) == NULL ) {
181 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
185 memset( rrg->epts, 0, sizeof( endpoint_t* ) * MAX_EP_GROUP );
187 rte->rrgroups[group] = rrg;
189 rrg->ep_idx = 0; // next endpoint to send to
190 rrg->nused = 0; // number populated
191 rrg->nendpts = MAX_EP_GROUP; // number allocated
193 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
196 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
199 if( rrg->nused >= rrg->nendpts ) {
200 // future: reallocate
201 rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
205 rrg->epts[rrg->nused] = ep;
209 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
215 Given a name, find the socket fd needed to send to it. Returns the socket via
216 the user pointer passed in and sets the return value to true (1). If the
217 endpoint cannot be found false (0) is returned.
219 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
220 route_table_t* rt = NULL;
221 si_ctx_t* si_ctx = NULL;
225 if( PARANOID_CHECKS ) {
227 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop ctx=%p rt=%p\n", ctx, rt );
230 if( (si_ctx = ctx->si_ctx) == NULL ) {
231 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop sictx is nil\n" );
234 if( (rt = get_rt( ctx )) == NULL ) { // get active rt and bump ref count
235 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop no rtable\n" );
239 rt = get_rt( ctx ); // get active rt and bump ref count
240 si_ctx = ctx->si_ctx;
243 ep = rmr_sym_get( rt->ephash, ep_name, 1 );
244 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
245 if( uepp != NULL ) { // caller needs endpoint too, give it back
249 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
250 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
251 release_rt( ctx, rt ); // drop ref count
255 release_rt( ctx, rt ); // drop ref count
257 if( ! ep->open ) { // not open -- connect now
258 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
259 if( ep->addr == NULL ) { // name didn't resolve before, try again
260 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
262 if( uta_link2( ctx, ep ) ) { // find entry in table and create link
265 *nn_sock = ep->nn_sock; // pass socket back to caller
266 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup
268 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
270 *nn_sock = ep->nn_sock;
278 Make a round robin selection within a round robin group for a route table
279 entry. Returns the socket fd if there is a rte for the message
280 key, and group is defined. Socket is returned via pointer in the parm
283 The group is the group number to select from.
285 The user supplied (via pointer to) integer 'more' will be set if there are
286 additional groups beyond the one selected. This allows the caller to
287 to easily iterate over the group list -- more is set when the group should
288 be incremented and the function invoked again. Groups start at 0.
290 The return value is true (>0) if the socket was found and *nn_sock was updated
291 and false (0) if there is no associated socket for the msg type, group combination.
292 We return the index+1 from the round robin table on success so that we can verify
293 during test that different entries are being seleted.
295 NOTE: The round robin selection index increment might collide with other
296 threads if multiple threads are attempting to send to the same round
297 robin group; the consequences are small and avoid locking. The only side
298 effect is either sending two messages in a row to, or skipping, an endpoint.
299 Both of these, in the grand scheme of things, is minor compared to the
300 overhead of grabbing a lock on each call.
302 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
304 endpoint_t* ep; // selected end point
305 int state = FALSE; // processing state
310 if( PARANOID_CHECKS ) {
311 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
315 si_ctx = ctx->si_ctx;
318 if( ! more ) { // eliminate cheks each time we need to use
322 if( ! nn_sock ) { // user didn't supply a pointer
323 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
330 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
335 if( group < 0 || group >= rte->nrrgroups ) {
336 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
341 if( (rrg = rte->rrgroups[group]) == NULL ) {
342 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
343 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
347 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
349 switch( rrg->nused ) {
350 case 0: // nothing allocated, just punt
351 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
354 case 1: // exactly one, no rr to deal with
356 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
360 default: // need to pick one and adjust rr counts
361 idx = rrg->ep_idx++ % rrg->nused; // see note above
362 ep = rrg->epts[idx]; // select next endpoint
363 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
364 state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
368 if( uepp != NULL ) { // caller may need refernce to endpoint too; give it if pointer supplied
371 if( state ) { // end point selected, open if not, get socket either way
372 if( ! ep->open ) { // not connected
373 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
374 if( ep->addr == NULL ) { // name didn't resolve before, try again
375 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
378 if( uta_link2( ctx, ep ) ) { // find entry in table and create link
380 *nn_sock = ep->nn_sock; // pass socket back to caller
381 fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
385 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
387 *nn_sock = ep->nn_sock;
391 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
396 Given a message, use the meid field to find the owner endpoint for the meid.
397 The owner ep is then used to extract the socket through which the message
398 is sent. This returns TRUE if we found a socket and it was written to the
399 nn_sock pointer; false if we didn't.
401 We've been told that the meid is a string, thus we count on it being a nil
402 terminated set of bytes.
404 If we return false the caller's ep reference may NOT be valid or even nil.
406 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
407 endpoint_t* ep; // seected end point
408 int state = FALSE; // processing state
412 if( PARANOID_CHECKS ) {
413 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
417 si_ctx = ctx->si_ctx;
421 if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast
426 meid = ((uta_mhdr_t *) msg->header)->meid;
428 ep = get_meid_owner( rtable, meid );
429 if( uepp != NULL ) { // caller needs refernce to endpoint too
434 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
439 if( ! ep->open ) { // not connected
440 if( ep->addr == NULL ) { // name didn't resolve before, try again
441 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
444 if( uta_link2( ctx, ep ) ) { // find entry in table and create link
446 *nn_sock = ep->nn_sock; // pass socket back to caller
450 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
452 *nn_sock = ep->nn_sock;
459 Finds the rtable entry which matches the key. Returns a nil pointer if
460 no entry is found. If try_alternate is set, then we will attempt
461 to find the entry with a key based only on the message type.
463 static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
464 uint64_t key; // key is sub id and mtype banged together
465 rtable_ent_t* rte; // the entry we found
467 if( rt == NULL || rt->hash == NULL ) {
471 key = build_rt_key( sid, mtype ); // first try with a 'full' key
472 if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have
476 if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set
477 key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key
478 rte = rmr_sym_pull( rt->hash, key ); // see what we get with this
485 Return a string of count information. E.g.:
486 <ep-name>:<port> <good> <hard-fail> <soft-fail>
488 Caller must free the string allocated if a buffer was not provided.
490 Pointer returned is to a freshly allocated string, or the user buffer
493 If the endpoint passed is a nil pointer, then we return a nil -- caller
496 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
497 char* rs; // result string
507 rs = malloc( sizeof( char ) * ubuf_len );
510 snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
516 // ---- fd to ep functions --------------------------------------------------------------------------
519 Create the hash which maps file descriptors to endpoints. We need this
520 to easily mark an endpoint as disconnected when we are notified. Thus we
521 expect these to be driven very seldomly; locking should not be an issue.
522 Locking is needed to prevent problems when the user application is multi-
523 threaded and attempting to (re)connect from concurrent threads.
525 static void fd2ep_init( uta_ctx_t* ctx ) {
527 if( ctx && ! ctx->fd2ep ) {
528 ctx->fd2ep = rmr_sym_alloc( 129 );
530 if( ctx->fd2ep_gate == NULL ) {
531 ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
532 if( ctx->fd2ep_gate != NULL ) {
533 pthread_mutex_init( ctx->fd2ep_gate, NULL );
540 Add an entry into the fd2ep hash to map the FD to the endpoint.
542 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
543 if( ctx && ctx->fd2ep ) {
544 pthread_mutex_lock( ctx->fd2ep_gate );
546 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
548 pthread_mutex_unlock( ctx->fd2ep_gate );
553 Given a file descriptor this fetches the related endpoint from the hash and
554 deletes the entry from the hash (when we detect a disconnect).
556 This will also set the state on the ep open to false, and revoke the
559 static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
560 endpoint_t* ep = NULL;
562 if( ctx && ctx->fd2ep ) {
563 ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
565 pthread_mutex_lock( ctx->fd2ep_gate );
567 rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
569 pthread_mutex_unlock( ctx->fd2ep_gate );
577 Given a file descriptor fetches the related endpoint from the hash.
578 Returns nil if there is no reference in the hash.
580 static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
581 endpoint_t* ep = NULL;
583 if( ctx && ctx->fd2ep ) {
584 pthread_mutex_lock( ctx->fd2ep_gate );
586 ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
588 pthread_mutex_unlock( ctx->fd2ep_gate );