Address code analysis issues
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_si_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Mark an endpoint closed because it's in a failing state.
53 */
54 static void uta_ep_failed( endpoint_t* ep ) {
55         if( ep != NULL ) {
56                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
57                 ep->open = FALSE;
58         }
59 }
60
61 /*
62         Establish a TCP connection to the indicated target (IP address).
63         Target assumed to be address:port.  The new socket is returned via the
64         user supplied pointer so that a success/fail code is returned directly.
65         Return value is 0 (false) on failure, 1 (true)  on success.
66
67         In order to support multi-threaded user applications we must hold a lock before
68         we attempt to create a dialer and connect. NNG is thread safe, but we can
69         get things into a bad state if we allow a collision here.  The lock grab
70         only happens on the intial session setup.
71 */
72 //static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
73 static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
74         static int      flags = 0;
75         char*           target;
76         char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
77         char*           addr;
78         int                     state = FALSE;
79         char*           tok;
80
81         if( ep == NULL ) {
82                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
83                 return FALSE;
84         }
85
86         target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
87         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
88                 if( ep->notify ) {
89                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
90                         ep->notify = 0;
91                 }
92                 return FALSE;
93         }
94
95         pthread_mutex_lock( &ep->gate );                        // grab the lock
96         if( ep->open ) {
97                 pthread_mutex_unlock( &ep->gate );
98                 return TRUE;
99         }
100
101         snprintf( conn_info, sizeof( conn_info ), "%s", target );
102         errno = 0;
103         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104         if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
105                 pthread_mutex_unlock( &ep->gate );
106
107                 if( ep->notify ) {                                                      // need to notify if set
108                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
109                         ep->notify = 0;
110                 }
111                 return FALSE;
112         }
113
114         if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
115
116         ep->open = TRUE;                                                // set open/notify before giving up lock
117         fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
118
119         if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
120                 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
121                 ep->notify = 1;
122         }
123
124         pthread_mutex_unlock( &ep->gate );
125         return TRUE;
126 }
127
128 /*
129         This provides a protocol independent mechanism for establishing the connection to an endpoint.
130         Return is true (1) if the link was opened; false on error.
131 */
132 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
133         uta_ctx_t* ctx;
134
135         if( ep == NULL ) {
136                 return FALSE;
137         }
138
139         if( ep->open )  {                       // already open, do nothing
140                 return TRUE;
141         }
142
143         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
144                 return FALSE;
145         }
146
147         uta_link2( ctx, ep );
148         return ep->open;
149 }
150
151
152 /*
153         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
154         hash we add it and create the endpoint struct.
155
156         The caller must supply the specific route table (we assume it will be pending, but they
157         could live on the edge and update the active one, though that's not at all a good idea).
158 */
159 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
160         endpoint_t*     ep;
161         rrgroup_t* rrg;                         // pointer at group to update
162
163         if( ! rte || ! rt ) {
164                 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
165                 return NULL;
166         }
167
168         if( rte->nrrgroups <= group || group < 0 ) {
169                 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
170                 return NULL;
171         }
172
173         //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
174         if( (rrg = rte->rrgroups[group]) == NULL ) {
175                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
176                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
177                         return NULL;
178                 }
179                 memset( rrg, 0, sizeof( *rrg ) );
180
181                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
182                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
183                         free( rrg );
184                         return NULL;
185                 }
186                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
187
188                 rte->rrgroups[group] = rrg;
189                 //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
190
191                 rrg->ep_idx = 0;                                                // next endpoint to send to
192                 rrg->nused = 0;                                                 // number populated
193                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
194
195                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
196         }
197
198         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
199
200         if( rrg != NULL ) {
201                 if( rrg->nused >= rrg->nendpts ) {
202                         // future: reallocate
203                         rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
204                         return NULL;
205                 }
206
207                 rrg->epts[rrg->nused] = ep;
208                 rrg->nused++;
209         }
210
211         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
212         return ep;
213 }
214
215
216 /*
217         Given a name, find the nano socket needed to send to it. Returns the socket via
218         the user pointer passed in and sets the return value to true (1). If the
219         endpoint cannot be found false (0) is returned.
220 */
221 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
222         route_table_t*  rt = NULL;
223         si_ctx_t*               si_ctx;
224         endpoint_t*             ep;
225         int                             state = FALSE;
226
227         if( PARANOID_CHECKS ) {
228                 if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
229                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
230                         return FALSE;
231                 }
232         } else {
233                 rt = ctx->rtable;                               // faster but more risky
234                 si_ctx = ctx->si_ctx;
235         }
236
237         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
238         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
239         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
240                 *uepp = ep;
241         }
242         if( ep == NULL ) {
243                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
244                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
245                         return FALSE;
246                 }
247         }
248
249         if( ! ep->open )  {                                                                             // not open -- connect now
250                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
251                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
252                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
253                 }
254                 if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
255                         state = TRUE;
256                         ep->open = TRUE;
257                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
258                         fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
259                 }
260                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
261         } else {
262                 *nn_sock = ep->nn_sock;
263                 state = TRUE;
264         }
265
266         return state;
267 }
268
269 /*
270         Make a round robin selection within a round robin group for a route table
271         entry. Returns the nanomsg socket if there is a rte for the message
272         key, and group is defined. Socket is returned via pointer in the parm
273         list (nn_sock).
274
275         The group is the group number to select from.
276
277         The user supplied (via pointer to) integer 'more' will be set if there are
278         additional groups beyond the one selected. This allows the caller to
279         to easily iterate over the group list -- more is set when the group should
280         be incremented and the function invoked again. Groups start at 0.
281
282         The return value is true (>0) if the socket was found and *nn_sock was updated
283         and false (0) if there is no associated socket for the msg type, group combination.
284         We return the index+1 from the round robin table on success so that we can verify
285         during test that different entries are being seleted; we cannot depend on the nng
286         socket being different as we could with nano.
287
288         NOTE:   The round robin selection index increment might collide with other
289                 threads if multiple threads are attempting to send to the same round
290                 robin group; the consequences are small and avoid locking. The only side
291                 effect is either sending two messages in a row to, or skipping, an endpoint.
292                 Both of these, in the grand scheme of things, is minor compared to the
293                 overhead of grabbing a lock on each call.
294 */
295 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
296         si_ctx_t*               si_ctx;
297         endpoint_t*     ep;                             // selected end point
298         int  state = FALSE;                     // processing state
299         int dummy;
300         rrgroup_t* rrg;
301         int     idx;
302
303         if( PARANOID_CHECKS ) {
304                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
305                         return FALSE;
306                 }
307         } else {
308                 si_ctx = ctx->si_ctx;
309         }
310
311         //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
312
313         if( ! more ) {                          // eliminate cheks each time we need to use
314                 more = &dummy;
315         }
316
317         if( ! nn_sock ) {                       // user didn't supply a pointer
318                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
319                 errno = EINVAL;
320                 *more = 0;
321                 return FALSE;
322         }
323
324         if( rte == NULL ) {
325                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
326                 *more = 0;
327                 return FALSE;
328         }
329
330         if( group < 0 || group >= rte->nrrgroups ) {
331                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
332                 *more = 0;
333                 return FALSE;
334         }
335
336         if( (rrg = rte->rrgroups[group]) == NULL ) {
337                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
338                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
339                 return FALSE;
340         }
341
342         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
343
344         switch( rrg->nused ) {
345                 case 0:                         // nothing allocated, just punt
346                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
347                         return FALSE;
348
349                 case 1:                         // exactly one, no rr to deal with
350                         ep = rrg->epts[0];
351                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
352                         state = TRUE;
353                         break;
354
355                 default:                                                                                // need to pick one and adjust rr counts
356                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
357                         ep = rrg->epts[idx];                                            // select next endpoint
358                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
359                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
360                         break;
361         }
362
363         if( uepp != NULL ) {                                                            // caller may need refernce to endpoint too; give it if pointer supplied
364                 *uepp = ep;
365         }
366         if( state ) {                                                                           // end point selected, open if not, get socket either way
367                 if( ! ep->open ) {                                                              // not connected
368                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
369                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
370                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
371                         }
372
373                         if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
374                                 ep->open = TRUE;
375                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
376                                 fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
377                         } else {
378                                 state = FALSE;
379                         }
380                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
381                 } else {
382                         *nn_sock = ep->nn_sock;
383                 }
384         }
385
386         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
387         return state;
388 }
389
390 /*
391         Given a message, use the meid field to find the owner endpoint for the meid.
392         The owner ep is then used to extract the socket through which the message
393         is sent. This returns TRUE if we found a socket and it was written to the
394         nn_sock pointer; false if we didn't.
395
396         We've been told that the meid is a string, thus we count on it being a nil
397         terminated set of bytes.
398
399         If we return false the caller's ep reference may NOT be valid or even nil.
400 */
401 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
402         endpoint_t*     ep;                             // seected end point
403         int     state = FALSE;                  // processing state
404         char*   meid;
405         si_ctx_t*       si_ctx;
406
407         if( PARANOID_CHECKS ) {
408                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
409                         return FALSE;
410                 }
411         } else {
412                 si_ctx = ctx->si_ctx;
413         }
414
415         errno = 0;
416         if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
417                 errno = EINVAL;
418                 return FALSE;
419         }
420
421         meid = ((uta_mhdr_t *) msg->header)->meid;
422
423         ep = get_meid_owner( rtable, meid );
424         if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
425                 *uepp = ep;
426         }
427
428         if( ep == NULL ) {
429                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
430                 return FALSE;
431         }
432
433         state = TRUE;
434         if( ! ep->open ) {                                                              // not connected
435                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
436                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
437                 }
438
439                 if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
440                         ep->open = TRUE;
441                         *nn_sock = ep->nn_sock;                                 // pass socket back to caller
442                 } else {
443                         state = FALSE;
444                 }
445                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
446         } else {
447                 *nn_sock = ep->nn_sock;
448         }
449
450         return state;
451 }
452
453 /*
454         Finds the rtable entry which matches the key. Returns a nil pointer if
455         no entry is found. If try_alternate is set, then we will attempt
456         to find the entry with a key based only on the message type.
457 */
458 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
459         uint64_t key;                   // key is sub id and mtype banged together
460         rtable_ent_t* rte;              // the entry we found
461
462         if( rt == NULL || rt->hash == NULL ) {
463                 return NULL;
464         }
465
466         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
467         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
468                 return rte;
469         }
470
471         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
472                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
473                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
474         }
475
476         return rte;
477 }
478
479 /*
480         Return a string of count information. E.g.:
481                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
482
483         Caller must free the string allocated if a buffer was not provided.
484
485         Pointer returned is to a freshly allocated string, or the user buffer
486         for convenience.
487
488         If the endpoint passed is a nil pointer, then we return a nil -- caller
489         must check!
490 */
491 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
492         char*   rs;                     // result string
493
494         if( ep == NULL ) {
495                 return NULL;
496         }
497
498         if( ubuf != NULL ) {
499                 rs = ubuf;
500         } else {
501                 ubuf_len = 256;
502                 rs = malloc( sizeof( char ) * ubuf_len );
503         }
504
505         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
506
507         return rs;
508 }
509
510
511 // ---- fd to ep functions --------------------------------------------------------------------------
512
513 /*
514         Create the hash which maps file descriptors to endpoints. We need this
515         to easily mark an endpoint as disconnected when we are notified. Thus we
516         expect these to be driven very seldomly; locking should not be an issue.
517         Locking is needed to prevent problems when the user application is multi-
518         threaded and attempting to (re)connect from concurrent threads.
519 */
520 static void fd2ep_init( uta_ctx_t* ctx ) {
521
522         if( ctx  && ! ctx->fd2ep ) {
523                 ctx->fd2ep = rmr_sym_alloc( 129 );
524
525                 if( ctx->fd2ep_gate == NULL ) {
526                         ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
527                         if( ctx->fd2ep_gate != NULL ) {
528                                 pthread_mutex_init( ctx->fd2ep_gate, NULL );
529                         }
530                 }
531         }
532 }
533
534 /*
535         Add an entry into the fd2ep hash to map the FD to the endpoint.
536 */
537 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
538         if( ctx && ctx->fd2ep ) {
539                 pthread_mutex_lock( ctx->fd2ep_gate );
540
541                 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
542
543                 pthread_mutex_unlock( ctx->fd2ep_gate );
544         }
545 }
546
547 /*
548         Given a file descriptor this fetches the related endpoint from the hash and
549         deletes the entry from the hash (when we detect a disconnect).
550
551         This will also set the state on the ep open to false, and revoke the
552         FD (nn_socket).
553 */
554 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
555         endpoint_t* ep = NULL;
556
557         if( ctx && ctx->fd2ep ) {
558                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
559                 if( ep ) {
560                         pthread_mutex_lock( ctx->fd2ep_gate );
561
562                         rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
563
564                         pthread_mutex_unlock( ctx->fd2ep_gate );
565                 }
566         }
567
568         return ep;
569 }
570
571 /*
572         Given a file descriptor fetches the related endpoint from the hash.
573         Returns nil if there is no reference in the hash.
574 */
575 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
576         endpoint_t* ep = NULL;
577
578         if( ctx && ctx->fd2ep ) {
579                 pthread_mutex_lock( ctx->fd2ep_gate );
580
581                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
582
583                 pthread_mutex_unlock( ctx->fd2ep_gate );
584         }
585
586         return ep;
587 }
588
589
590 #endif