Correct table clear bug in route table ready
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_si_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Mark an endpoint closed because it's in a failing state.
53 */
54 static void uta_ep_failed( endpoint_t* ep ) {
55         if( ep != NULL ) {
56                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
57                 ep->open = FALSE;
58         }
59 }
60
61 /*
62         Establish a TCP connection to the indicated target (IP address).
63         Target assumed to be address:port.  The new socket is returned via the
64         user supplied pointer so that a success/fail code is returned directly.
65         Return value is 0 (false) on failure, 1 (true)  on success.
66
67         In order to support multi-threaded user applications we must hold a lock before
68         we attempt to create a dialer and connect. NNG is thread safe, but we can
69         get things into a bad state if we allow a collision here.  The lock grab
70         only happens on the intial session setup.
71 */
72 //static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
73 static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
74         static int      flags = 0;
75         char*           target;
76         char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
77         char*           addr;
78         int                     state = FALSE;
79         char*           tok;
80
81         if( ep == NULL ) {
82                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
83                 return FALSE;
84         }
85
86         target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
87         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
88                 if( ep->notify ) {
89                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
90                         ep->notify = 0;
91                 }
92                 return FALSE;
93         }
94
95         pthread_mutex_lock( &ep->gate );                        // grab the lock
96         if( ep->open ) {
97                 pthread_mutex_unlock( &ep->gate );
98                 return TRUE;
99         }
100
101         snprintf( conn_info, sizeof( conn_info ), "%s", target );
102         errno = 0;
103         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104         if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
105                 pthread_mutex_unlock( &ep->gate );
106
107                 if( ep->notify ) {                                                      // need to notify if set
108                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
109                         ep->notify = 0;
110                 }
111                 return FALSE;
112         }
113
114         if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
115
116         ep->open = TRUE;                                                // set open/notify before giving up lock
117         fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
118
119         if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
120                 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
121                 ep->notify = 1;
122         }
123
124         pthread_mutex_unlock( &ep->gate );
125         return TRUE;
126 }
127
128 /*
129         This provides a protocol independent mechanism for establishing the connection to an endpoint.
130         Return is true (1) if the link was opened; false on error.
131 */
132 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
133         uta_ctx_t* ctx;
134
135         if( ep == NULL ) {
136                 return FALSE;
137         }
138
139         if( ep->open )  {                       // already open, do nothing
140                 return TRUE;
141         }
142
143         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
144                 return FALSE;
145         }
146
147         uta_link2( ctx, ep );
148         return ep->open;
149 }
150
151
152 /*
153         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
154         hash we add it and create the endpoint struct.
155
156         The caller must supply the specific route table (we assume it will be pending, but they
157         could live on the edge and update the active one, though that's not at all a good idea).
158 */
159 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
160         endpoint_t*     ep;
161         rrgroup_t* rrg;                         // pointer at group to update
162
163         if( ! rte || ! rt ) {
164                 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
165                 return NULL;
166         }
167
168         if( rte->nrrgroups <= group || group < 0 ) {
169                 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
170                 return NULL;
171         }
172
173         if( (rrg = rte->rrgroups[group]) == NULL ) {
174                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
175                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
176                         return NULL;
177                 }
178                 memset( rrg, 0, sizeof( *rrg ) );
179
180                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t* ) * MAX_EP_GROUP )) == NULL ) {
181                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
182                         free( rrg );
183                         return NULL;
184                 }
185                 memset( rrg->epts, 0, sizeof( endpoint_t* ) * MAX_EP_GROUP );
186
187                 rte->rrgroups[group] = rrg;
188
189                 rrg->ep_idx = 0;                                                // next endpoint to send to
190                 rrg->nused = 0;                                                 // number populated
191                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
192
193                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
194         }
195
196         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
197
198         if( rrg != NULL ) {
199                 if( rrg->nused >= rrg->nendpts ) {
200                         // future: reallocate
201                         rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
202                         return NULL;
203                 }
204
205                 rrg->epts[rrg->nused] = ep;
206                 rrg->nused++;
207         }
208
209         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
210         return ep;
211 }
212
213
214 /*
215         Given a name, find the socket fd needed to send to it. Returns the socket via
216         the user pointer passed in and sets the return value to true (1). If the
217         endpoint cannot be found false (0) is returned.
218 */
219 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
220         route_table_t*  rt = NULL;
221         si_ctx_t*               si_ctx = NULL;
222         endpoint_t*             ep;
223         int                             state = FALSE;
224
225         if( PARANOID_CHECKS ) {
226                 if( ctx == NULL ) {
227                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop ctx=%p rt=%p\n", ctx, rt );
228                         return FALSE;
229                 }
230                 if( (si_ctx = ctx->si_ctx) == NULL ) {
231                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop sictx is nil\n" );
232                         return FALSE;
233                 }
234                 if( (rt = get_rt( ctx )) == NULL ) {                            // get active rt and bump ref count
235                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop no rtable\n" );
236                         return FALSE;
237                 }
238         } else {
239                 rt = get_rt( ctx );                             // get active rt and bump ref count
240                 si_ctx = ctx->si_ctx;
241         }
242
243         ep =  rmr_sym_get( rt->ephash, ep_name, 1 );
244         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
245         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
246                 *uepp = ep;
247         }
248         if( ep == NULL ) {
249                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
250                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
251                         release_rt( ctx, rt );                                                  // drop ref count
252                         return FALSE;
253                 }
254         }
255         release_rt( ctx, rt );                                                                          // drop ref count
256
257         if( ! ep->open )  {                                                                             // not open -- connect now
258                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
259                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
260                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
261                 }
262                 if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
263                         state = TRUE;
264                         ep->open = TRUE;
265                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
266                         fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
267                 }
268                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
269         } else {
270                 *nn_sock = ep->nn_sock;
271                 state = TRUE;
272         }
273
274         return state;
275 }
276
277 /*
278         Make a round robin selection within a round robin group for a route table
279         entry. Returns the socket fd if there is a rte for the message
280         key, and group is defined. Socket is returned via pointer in the parm
281         list (nn_sock).
282
283         The group is the group number to select from.
284
285         The user supplied (via pointer to) integer 'more' will be set if there are
286         additional groups beyond the one selected. This allows the caller to
287         to easily iterate over the group list -- more is set when the group should
288         be incremented and the function invoked again. Groups start at 0.
289
290         The return value is true (>0) if the socket was found and *nn_sock was updated
291         and false (0) if there is no associated socket for the msg type, group combination.
292         We return the index+1 from the round robin table on success so that we can verify
293         during test that different entries are being seleted.
294
295         NOTE:   The round robin selection index increment might collide with other
296                 threads if multiple threads are attempting to send to the same round
297                 robin group; the consequences are small and avoid locking. The only side
298                 effect is either sending two messages in a row to, or skipping, an endpoint.
299                 Both of these, in the grand scheme of things, is minor compared to the
300                 overhead of grabbing a lock on each call.
301 */
302 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
303         si_ctx_t*               si_ctx;
304         endpoint_t*     ep;                             // selected end point
305         int  state = FALSE;                     // processing state
306         int dummy;
307         rrgroup_t* rrg;
308         int     idx;
309
310         if( PARANOID_CHECKS ) {
311                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
312                         return FALSE;
313                 }
314         } else {
315                 si_ctx = ctx->si_ctx;
316         }
317
318         if( ! more ) {                          // eliminate cheks each time we need to use
319                 more = &dummy;
320         }
321
322         if( ! nn_sock ) {                       // user didn't supply a pointer
323                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
324                 errno = EINVAL;
325                 *more = 0;
326                 return FALSE;
327         }
328
329         if( rte == NULL ) {
330                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
331                 *more = 0;
332                 return FALSE;
333         }
334
335         if( group < 0 || group >= rte->nrrgroups ) {
336                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
337                 *more = 0;
338                 return FALSE;
339         }
340
341         if( (rrg = rte->rrgroups[group]) == NULL ) {
342                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
343                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
344                 return FALSE;
345         }
346
347         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
348
349         switch( rrg->nused ) {
350                 case 0:                         // nothing allocated, just punt
351                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
352                         return FALSE;
353
354                 case 1:                         // exactly one, no rr to deal with
355                         ep = rrg->epts[0];
356                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
357                         state = TRUE;
358                         break;
359
360                 default:                                                                                // need to pick one and adjust rr counts
361                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
362                         ep = rrg->epts[idx];                                            // select next endpoint
363                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
364                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
365                         break;
366         }
367
368         if( uepp != NULL ) {                                                            // caller may need refernce to endpoint too; give it if pointer supplied
369                 *uepp = ep;
370         }
371         if( state ) {                                                                           // end point selected, open if not, get socket either way
372                 if( ! ep->open ) {                                                              // not connected
373                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
374                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
375                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
376                         }
377
378                         if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
379                                 ep->open = TRUE;
380                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
381                                 fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
382                         } else {
383                                 state = FALSE;
384                         }
385                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
386                 } else {
387                         *nn_sock = ep->nn_sock;
388                 }
389         }
390
391         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
392         return state;
393 }
394
395 /*
396         Given a message, use the meid field to find the owner endpoint for the meid.
397         The owner ep is then used to extract the socket through which the message
398         is sent. This returns TRUE if we found a socket and it was written to the
399         nn_sock pointer; false if we didn't.
400
401         We've been told that the meid is a string, thus we count on it being a nil
402         terminated set of bytes.
403
404         If we return false the caller's ep reference may NOT be valid or even nil.
405 */
406 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
407         endpoint_t*     ep;                             // seected end point
408         int     state = FALSE;                  // processing state
409         char*   meid;
410         si_ctx_t*       si_ctx;
411
412         if( PARANOID_CHECKS ) {
413                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
414                         return FALSE;
415                 }
416         } else {
417                 si_ctx = ctx->si_ctx;
418         }
419
420         errno = 0;
421         if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
422                 errno = EINVAL;
423                 return FALSE;
424         }
425
426         meid = ((uta_mhdr_t *) msg->header)->meid;
427
428         ep = get_meid_owner( rtable, meid );
429         if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
430                 *uepp = ep;
431         }
432
433         if( ep == NULL ) {
434                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
435                 return FALSE;
436         }
437
438         state = TRUE;
439         if( ! ep->open ) {                                                              // not connected
440                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
441                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
442                 }
443
444                 if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
445                         ep->open = TRUE;
446                         *nn_sock = ep->nn_sock;                                 // pass socket back to caller
447                 } else {
448                         state = FALSE;
449                 }
450                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
451         } else {
452                 *nn_sock = ep->nn_sock;
453         }
454
455         return state;
456 }
457
458 /*
459         Finds the rtable entry which matches the key. Returns a nil pointer if
460         no entry is found. If try_alternate is set, then we will attempt
461         to find the entry with a key based only on the message type.
462 */
463 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
464         uint64_t key;                   // key is sub id and mtype banged together
465         rtable_ent_t* rte;              // the entry we found
466
467         if( rt == NULL || rt->hash == NULL ) {
468                 return NULL;
469         }
470
471         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
472         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
473                 return rte;
474         }
475
476         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
477                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
478                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
479         }
480
481         return rte;
482 }
483
484 /*
485         Return a string of count information. E.g.:
486                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
487
488         Caller must free the string allocated if a buffer was not provided.
489
490         Pointer returned is to a freshly allocated string, or the user buffer
491         for convenience.
492
493         If the endpoint passed is a nil pointer, then we return a nil -- caller
494         must check!
495 */
496 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
497         char*   rs;                     // result string
498
499         if( ep == NULL ) {
500                 return NULL;
501         }
502
503         if( ubuf != NULL ) {
504                 rs = ubuf;
505         } else {
506                 ubuf_len = 256;
507                 rs = malloc( sizeof( char ) * ubuf_len );
508         }
509
510         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
511
512         return rs;
513 }
514
515
516 // ---- fd to ep functions --------------------------------------------------------------------------
517
518 /*
519         Create the hash which maps file descriptors to endpoints. We need this
520         to easily mark an endpoint as disconnected when we are notified. Thus we
521         expect these to be driven very seldomly; locking should not be an issue.
522         Locking is needed to prevent problems when the user application is multi-
523         threaded and attempting to (re)connect from concurrent threads.
524 */
525 static void fd2ep_init( uta_ctx_t* ctx ) {
526
527         if( ctx  && ! ctx->fd2ep ) {
528                 ctx->fd2ep = rmr_sym_alloc( 129 );
529
530                 if( ctx->fd2ep_gate == NULL ) {
531                         ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
532                         if( ctx->fd2ep_gate != NULL ) {
533                                 pthread_mutex_init( ctx->fd2ep_gate, NULL );
534                         }
535                 }
536         }
537 }
538
539 /*
540         Add an entry into the fd2ep hash to map the FD to the endpoint.
541 */
542 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
543         if( ctx && ctx->fd2ep ) {
544                 pthread_mutex_lock( ctx->fd2ep_gate );
545
546                 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
547
548                 pthread_mutex_unlock( ctx->fd2ep_gate );
549         }
550 }
551
552 /*
553         Given a file descriptor this fetches the related endpoint from the hash and
554         deletes the entry from the hash (when we detect a disconnect).
555
556         This will also set the state on the ep open to false, and revoke the
557         FD (nn_socket).
558 */
559 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
560         endpoint_t* ep = NULL;
561
562         if( ctx && ctx->fd2ep ) {
563                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
564                 if( ep ) {
565                         pthread_mutex_lock( ctx->fd2ep_gate );
566
567                         rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
568
569                         pthread_mutex_unlock( ctx->fd2ep_gate );
570                 }
571         }
572
573         return ep;
574 }
575
576 /*
577         Given a file descriptor fetches the related endpoint from the hash.
578         Returns nil if there is no reference in the hash.
579 */
580 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
581         endpoint_t* ep = NULL;
582
583         if( ctx && ctx->fd2ep ) {
584                 pthread_mutex_lock( ctx->fd2ep_gate );
585
586                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
587
588                 pthread_mutex_unlock( ctx->fd2ep_gate );
589         }
590
591         return ep;
592 }
593
594
595 #endif