get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
static int flags = 0;
-
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
- if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+ if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
return FALSE;
}
- uta_link2( ctx->si_ctx, ep );
- if( ep->open ) {
- fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
- }
+ uta_link2( ctx, ep );
return ep->open;
}
return NULL;
}
- //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] );
if( (rrg = rte->rrgroups[group]) == NULL ) {
if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
}
memset( rrg, 0, sizeof( *rrg ) );
- if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
+ if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t* ) * MAX_EP_GROUP )) == NULL ) {
rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+ free( rrg );
return NULL;
}
- memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
+ memset( rrg->epts, 0, sizeof( endpoint_t* ) * MAX_EP_GROUP );
rte->rrgroups[group] = rrg;
- //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] );
rrg->ep_idx = 0; // next endpoint to send to
rrg->nused = 0; // number populated
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
}
- ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
+ ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
if( rrg != NULL ) {
if( rrg->nused >= rrg->nendpts ) {
/*
- Given a name, find the nano socket needed to send to it. Returns the socket via
+ Given a name, find the socket fd needed to send to it. Returns the socket via
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
- route_table_t* rt;
- si_ctx_t* si_ctx;
+ route_table_t* rt = NULL;
+ si_ctx_t* si_ctx = NULL;
endpoint_t* ep;
int state = FALSE;
- if( PARINOID_CHECKS ) {
- if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ if( PARANOID_CHECKS ) {
+ if( ctx == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
+ return FALSE;
+ }
+ rt = get_rt( ctx ); // get active rt and bump ref count
+ if( rt == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop rt=%p sictx=%p\n", rt, si_ctx );
return FALSE;
}
} else {
- rt = ctx->rtable; // faster but more risky
+ rt = get_rt( ctx ); // get active rt and bump ref count
si_ctx = ctx->si_ctx;
}
- ep = rmr_sym_get( rt->hash, ep_name, 1 );
+ ep = rmr_sym_get( rt->ephash, ep_name, 1 );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
if( uepp != NULL ) { // caller needs endpoint too, give it back
*uepp = ep;
}
if( ep == NULL ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
+ release_rt( ctx, rt ); // drop ref count
return FALSE;
}
}
+ release_rt( ctx, rt ); // drop ref count
if( ! ep->open ) { // not open -- connect now
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
/*
Make a round robin selection within a round robin group for a route table
- entry. Returns the nanomsg socket if there is a rte for the message
+ entry. Returns the socket fd if there is a rte for the message
key, and group is defined. Socket is returned via pointer in the parm
list (nn_sock).
The return value is true (>0) if the socket was found and *nn_sock was updated
and false (0) if there is no associated socket for the msg type, group combination.
We return the index+1 from the round robin table on success so that we can verify
- during test that different entries are being seleted; we cannot depend on the nng
- socket being different as we could with nano.
+ during test that different entries are being seleted.
NOTE: The round robin selection index increment might collide with other
threads if multiple threads are attempting to send to the same round
rrgroup_t* rrg;
int idx;
- if( PARINOID_CHECKS ) {
+ if( PARANOID_CHECKS ) {
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
si_ctx = ctx->si_ctx;
}
- //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
-
if( ! more ) { // eliminate cheks each time we need to use
more = &dummy;
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
char* meid;
si_ctx_t* si_ctx;
- if( PARINOID_CHECKS ) {
+ if( PARANOID_CHECKS ) {
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
- no entry is found. If try_alternate is set, then we will attempt
+ no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
/*
Create the hash which maps file descriptors to endpoints. We need this
- to easily mark an endpoint as disconnected when we are notified.
+ to easily mark an endpoint as disconnected when we are notified. Thus we
+ expect these to be driven very seldomly; locking should not be an issue.
+ Locking is needed to prevent problems when the user application is multi-
+ threaded and attempting to (re)connect from concurrent threads.
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
+
if( ctx && ! ctx->fd2ep ) {
- ctx->fd2ep = rmr_sym_alloc( 129 );
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+
+ if( ctx->fd2ep_gate == NULL ) {
+ ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+ if( ctx->fd2ep_gate != NULL ) {
+ pthread_mutex_init( ctx->fd2ep_gate, NULL );
+ }
+ }
}
}
/*
- Add an entry into the fd2ep hash which points to the given ep.
+ Add an entry into the fd2ep hash to map the FD to the endpoint.
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
/*
- Given a file descriptor fetches the related endpoint from the hash and
- deletes the entry from the hash (when we detect a disconnect).
+ Given a file descriptor this fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+
+ This will also set the state on the ep open to false, and revoke the
+ FD (nn_socket).
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
/*
Given a file descriptor fetches the related endpoint from the hash.
- Returns nil if there is no map.
+ Returns nil if there is no reference in the hash.
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
return ep;