static void uta_ep_failed( endpoint_t* ep ) {
if( ep != NULL ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
- ep->open = 0;
+ ep->open = FALSE;
}
}
}
uta_link2( ctx->si_ctx, ep );
+ if( ep->open ) {
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
+ }
return ep->open;
}
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep;
- int state = FALSE;
-
- if( rt == NULL ) {
- return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+ route_table_t* rt;
+ si_ctx_t* si_ctx;
+ endpoint_t* ep;
+ int state = FALSE;
+
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ rt = ctx->rtable; // faster but more risky
+ si_ctx = ctx->si_ctx;
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
} else {
Both of these, in the grand scheme of things, is minor compared to the
overhead of grabbing a lock on each call.
*/
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep; // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+ si_ctx_t* si_ctx;
+ endpoint_t* ep; // selected end point
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
int idx;
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ si_ctx = ctx->si_ctx;
+ }
+
//fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
if( ! more ) { // eliminate cheks each time we need to use
if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
} else {
state = FALSE;
}
return rs;
}
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+ Create the hash which maps file descriptors to endpoints. We need this
+ to easily mark an endpoint as disconnected when we are notified.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+ if( ctx && ! ctx->fd2ep ) {
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+ }
+}
+
+/*
+ Add an entry into the fd2ep hash which points to the given ep.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+ if( ctx && ctx->fd2ep ) {
+ rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+ }
+}
+
+/*
+ Given a file descriptor fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+*/
+static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+ if( ep ) {
+ rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+ }
+ }
+
+ return ep;
+}
+
+/*
+ Given a file descriptor fetches the related endpoint from the hash.
+ Returns nil if there is no map.
+*/
+static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+ }
+
+ return ep;
+}
+
+
#endif