Address complaints by code scanner
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_si_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Mark an endpoint closed because it's in a failing state.
53 */
54 static void uta_ep_failed( endpoint_t* ep ) {
55         if( ep != NULL ) {
56                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
57                 ep->open = FALSE;
58         }
59 }
60
61 /*
62         Establish a TCP connection to the indicated target (IP address).
63         Target assumed to be address:port.  The new socket is returned via the
64         user supplied pointer so that a success/fail code is returned directly.
65         Return value is 0 (false) on failure, 1 (true)  on success.
66
67         In order to support multi-threaded user applications we must hold a lock before
68         we attempt to create a dialer and connect. NNG is thread safe, but we can
69         get things into a bad state if we allow a collision here.  The lock grab
70         only happens on the intial session setup.
71 */
72 //static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
73 static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
74         static int      flags = 0;
75         char*           target;
76         char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
77         char*           addr;
78         int                     state = FALSE;
79         char*           tok;
80
81         if( ep == NULL ) {
82                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
83                 return FALSE;
84         }
85
86         target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
87         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
88                 if( ep->notify ) {
89                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
90                         ep->notify = 0;
91                 }
92                 return FALSE;
93         }
94
95         pthread_mutex_lock( &ep->gate );                        // grab the lock
96         if( ep->open ) {
97                 pthread_mutex_unlock( &ep->gate );
98                 return TRUE;
99         }
100
101         snprintf( conn_info, sizeof( conn_info ), "%s", target );
102         errno = 0;
103         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
104         if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
105                 pthread_mutex_unlock( &ep->gate );
106
107                 if( ep->notify ) {                                                      // need to notify if set
108                         rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
109                         ep->notify = 0;
110                 }
111                 return FALSE;
112         }
113
114         if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
115
116         ep->open = TRUE;                                                // set open/notify before giving up lock
117         fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
118
119         if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
120                 rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
121                 ep->notify = 1;
122         }
123
124         pthread_mutex_unlock( &ep->gate );
125         return TRUE;
126 }
127
128 /*
129         This provides a protocol independent mechanism for establishing the connection to an endpoint.
130         Return is true (1) if the link was opened; false on error.
131 */
132 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
133         uta_ctx_t* ctx;
134
135         if( ep == NULL ) {
136                 return FALSE;
137         }
138
139         if( ep->open )  {                       // already open, do nothing
140                 return TRUE;
141         }
142
143         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
144                 return FALSE;
145         }
146
147         uta_link2( ctx, ep );
148         return ep->open;
149 }
150
151
152 /*
153         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
154         hash we add it and create the endpoint struct.
155
156         The caller must supply the specific route table (we assume it will be pending, but they
157         could live on the edge and update the active one, though that's not at all a good idea).
158 */
159 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
160         endpoint_t*     ep;
161         rrgroup_t* rrg;                         // pointer at group to update
162
163         if( ! rte || ! rt ) {
164                 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
165                 return NULL;
166         }
167
168         if( rte->nrrgroups <= group || group < 0 ) {
169                 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
170                 return NULL;
171         }
172
173         if( (rrg = rte->rrgroups[group]) == NULL ) {
174                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
175                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
176                         return NULL;
177                 }
178                 memset( rrg, 0, sizeof( *rrg ) );
179
180                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t* ) * MAX_EP_GROUP )) == NULL ) {
181                         rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
182                         free( rrg );
183                         return NULL;
184                 }
185                 memset( rrg->epts, 0, sizeof( endpoint_t* ) * MAX_EP_GROUP );
186
187                 rte->rrgroups[group] = rrg;
188
189                 rrg->ep_idx = 0;                                                // next endpoint to send to
190                 rrg->nused = 0;                                                 // number populated
191                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
192
193                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
194         }
195
196         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
197
198         if( rrg != NULL ) {
199                 if( rrg->nused >= rrg->nendpts ) {
200                         // future: reallocate
201                         rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
202                         return NULL;
203                 }
204
205                 rrg->epts[rrg->nused] = ep;
206                 rrg->nused++;
207         }
208
209         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
210         return ep;
211 }
212
213
214 /*
215         Given a name, find the socket fd needed to send to it. Returns the socket via
216         the user pointer passed in and sets the return value to true (1). If the
217         endpoint cannot be found false (0) is returned.
218 */
219 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
220         route_table_t*  rt = NULL;
221         si_ctx_t*               si_ctx = NULL;
222         endpoint_t*             ep;
223         int                             state = FALSE;
224
225         if( PARANOID_CHECKS ) {
226                 if( ctx == NULL ) {
227                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
228                         return FALSE;
229                 }
230                 rt = get_rt( ctx );                             // get active rt and bump ref count
231                 if( rt == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
232                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop rt=%p sictx=%p\n", rt, si_ctx );
233                         return FALSE;
234                 }
235         } else {
236                 rt = get_rt( ctx );                             // get active rt and bump ref count
237                 si_ctx = ctx->si_ctx;
238         }
239
240         ep =  rmr_sym_get( rt->ephash, ep_name, 1 );
241         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
242         if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
243                 *uepp = ep;
244         }
245         if( ep == NULL ) {
246                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
247                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
248                         release_rt( ctx, rt );                                                  // drop ref count
249                         return FALSE;
250                 }
251         }
252         release_rt( ctx, rt );                                                                          // drop ref count
253
254         if( ! ep->open )  {                                                                             // not open -- connect now
255                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
256                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
257                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
258                 }
259                 if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
260                         state = TRUE;
261                         ep->open = TRUE;
262                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
263                         fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
264                 }
265                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
266         } else {
267                 *nn_sock = ep->nn_sock;
268                 state = TRUE;
269         }
270
271         return state;
272 }
273
274 /*
275         Make a round robin selection within a round robin group for a route table
276         entry. Returns the socket fd if there is a rte for the message
277         key, and group is defined. Socket is returned via pointer in the parm
278         list (nn_sock).
279
280         The group is the group number to select from.
281
282         The user supplied (via pointer to) integer 'more' will be set if there are
283         additional groups beyond the one selected. This allows the caller to
284         to easily iterate over the group list -- more is set when the group should
285         be incremented and the function invoked again. Groups start at 0.
286
287         The return value is true (>0) if the socket was found and *nn_sock was updated
288         and false (0) if there is no associated socket for the msg type, group combination.
289         We return the index+1 from the round robin table on success so that we can verify
290         during test that different entries are being seleted.
291
292         NOTE:   The round robin selection index increment might collide with other
293                 threads if multiple threads are attempting to send to the same round
294                 robin group; the consequences are small and avoid locking. The only side
295                 effect is either sending two messages in a row to, or skipping, an endpoint.
296                 Both of these, in the grand scheme of things, is minor compared to the
297                 overhead of grabbing a lock on each call.
298 */
299 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
300         si_ctx_t*               si_ctx;
301         endpoint_t*     ep;                             // selected end point
302         int  state = FALSE;                     // processing state
303         int dummy;
304         rrgroup_t* rrg;
305         int     idx;
306
307         if( PARANOID_CHECKS ) {
308                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
309                         return FALSE;
310                 }
311         } else {
312                 si_ctx = ctx->si_ctx;
313         }
314
315         if( ! more ) {                          // eliminate cheks each time we need to use
316                 more = &dummy;
317         }
318
319         if( ! nn_sock ) {                       // user didn't supply a pointer
320                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
321                 errno = EINVAL;
322                 *more = 0;
323                 return FALSE;
324         }
325
326         if( rte == NULL ) {
327                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
328                 *more = 0;
329                 return FALSE;
330         }
331
332         if( group < 0 || group >= rte->nrrgroups ) {
333                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
334                 *more = 0;
335                 return FALSE;
336         }
337
338         if( (rrg = rte->rrgroups[group]) == NULL ) {
339                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
340                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
341                 return FALSE;
342         }
343
344         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
345
346         switch( rrg->nused ) {
347                 case 0:                         // nothing allocated, just punt
348                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
349                         return FALSE;
350
351                 case 1:                         // exactly one, no rr to deal with
352                         ep = rrg->epts[0];
353                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
354                         state = TRUE;
355                         break;
356
357                 default:                                                                                // need to pick one and adjust rr counts
358                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
359                         ep = rrg->epts[idx];                                            // select next endpoint
360                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
361                         state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
362                         break;
363         }
364
365         if( uepp != NULL ) {                                                            // caller may need refernce to endpoint too; give it if pointer supplied
366                 *uepp = ep;
367         }
368         if( state ) {                                                                           // end point selected, open if not, get socket either way
369                 if( ! ep->open ) {                                                              // not connected
370                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
371                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
372                                 ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
373                         }
374
375                         if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
376                                 ep->open = TRUE;
377                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
378                                 fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
379                         } else {
380                                 state = FALSE;
381                         }
382                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
383                 } else {
384                         *nn_sock = ep->nn_sock;
385                 }
386         }
387
388         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
389         return state;
390 }
391
392 /*
393         Given a message, use the meid field to find the owner endpoint for the meid.
394         The owner ep is then used to extract the socket through which the message
395         is sent. This returns TRUE if we found a socket and it was written to the
396         nn_sock pointer; false if we didn't.
397
398         We've been told that the meid is a string, thus we count on it being a nil
399         terminated set of bytes.
400
401         If we return false the caller's ep reference may NOT be valid or even nil.
402 */
403 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
404         endpoint_t*     ep;                             // seected end point
405         int     state = FALSE;                  // processing state
406         char*   meid;
407         si_ctx_t*       si_ctx;
408
409         if( PARANOID_CHECKS ) {
410                 if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
411                         return FALSE;
412                 }
413         } else {
414                 si_ctx = ctx->si_ctx;
415         }
416
417         errno = 0;
418         if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
419                 errno = EINVAL;
420                 return FALSE;
421         }
422
423         meid = ((uta_mhdr_t *) msg->header)->meid;
424
425         ep = get_meid_owner( rtable, meid );
426         if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
427                 *uepp = ep;
428         }
429
430         if( ep == NULL ) {
431                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
432                 return FALSE;
433         }
434
435         state = TRUE;
436         if( ! ep->open ) {                                                              // not connected
437                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
438                         ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
439                 }
440
441                 if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
442                         ep->open = TRUE;
443                         *nn_sock = ep->nn_sock;                                 // pass socket back to caller
444                 } else {
445                         state = FALSE;
446                 }
447                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
448         } else {
449                 *nn_sock = ep->nn_sock;
450         }
451
452         return state;
453 }
454
455 /*
456         Finds the rtable entry which matches the key. Returns a nil pointer if
457         no entry is found. If try_alternate is set, then we will attempt
458         to find the entry with a key based only on the message type.
459 */
460 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
461         uint64_t key;                   // key is sub id and mtype banged together
462         rtable_ent_t* rte;              // the entry we found
463
464         if( rt == NULL || rt->hash == NULL ) {
465                 return NULL;
466         }
467
468         key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
469         if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
470                 return rte;
471         }
472
473         if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
474                 key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
475                 rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
476         }
477
478         return rte;
479 }
480
481 /*
482         Return a string of count information. E.g.:
483                 <ep-name>:<port> <good> <hard-fail> <soft-fail>
484
485         Caller must free the string allocated if a buffer was not provided.
486
487         Pointer returned is to a freshly allocated string, or the user buffer
488         for convenience.
489
490         If the endpoint passed is a nil pointer, then we return a nil -- caller
491         must check!
492 */
493 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
494         char*   rs;                     // result string
495
496         if( ep == NULL ) {
497                 return NULL;
498         }
499
500         if( ubuf != NULL ) {
501                 rs = ubuf;
502         } else {
503                 ubuf_len = 256;
504                 rs = malloc( sizeof( char ) * ubuf_len );
505         }
506
507         snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
508
509         return rs;
510 }
511
512
513 // ---- fd to ep functions --------------------------------------------------------------------------
514
515 /*
516         Create the hash which maps file descriptors to endpoints. We need this
517         to easily mark an endpoint as disconnected when we are notified. Thus we
518         expect these to be driven very seldomly; locking should not be an issue.
519         Locking is needed to prevent problems when the user application is multi-
520         threaded and attempting to (re)connect from concurrent threads.
521 */
522 static void fd2ep_init( uta_ctx_t* ctx ) {
523
524         if( ctx  && ! ctx->fd2ep ) {
525                 ctx->fd2ep = rmr_sym_alloc( 129 );
526
527                 if( ctx->fd2ep_gate == NULL ) {
528                         ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
529                         if( ctx->fd2ep_gate != NULL ) {
530                                 pthread_mutex_init( ctx->fd2ep_gate, NULL );
531                         }
532                 }
533         }
534 }
535
536 /*
537         Add an entry into the fd2ep hash to map the FD to the endpoint.
538 */
539 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
540         if( ctx && ctx->fd2ep ) {
541                 pthread_mutex_lock( ctx->fd2ep_gate );
542
543                 rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
544
545                 pthread_mutex_unlock( ctx->fd2ep_gate );
546         }
547 }
548
549 /*
550         Given a file descriptor this fetches the related endpoint from the hash and
551         deletes the entry from the hash (when we detect a disconnect).
552
553         This will also set the state on the ep open to false, and revoke the
554         FD (nn_socket).
555 */
556 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
557         endpoint_t* ep = NULL;
558
559         if( ctx && ctx->fd2ep ) {
560                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
561                 if( ep ) {
562                         pthread_mutex_lock( ctx->fd2ep_gate );
563
564                         rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
565
566                         pthread_mutex_unlock( ctx->fd2ep_gate );
567                 }
568         }
569
570         return ep;
571 }
572
573 /*
574         Given a file descriptor fetches the related endpoint from the hash.
575         Returns nil if there is no reference in the hash.
576 */
577 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
578         endpoint_t* ep = NULL;
579
580         if( ctx && ctx->fd2ep ) {
581                 pthread_mutex_lock( ctx->fd2ep_gate );
582
583                 ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
584
585                 pthread_mutex_unlock( ctx->fd2ep_gate );
586         }
587
588         return ep;
589 }
590
591
592 #endif