*/
static void uta_ep_failed( endpoint_t* ep ) {
if( ep != NULL ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] connection to %s was closed\n", ep->name );
- ep->open = 0;
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
+ ep->open = FALSE;
}
}
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
static int flags = 0;
-
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
char* tok;
if( ep == NULL ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] link2 ep was nil!\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
return FALSE;
}
target = ep->name; // always give name to transport so changing dest IP does not break reconnect
if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
if( ep->notify ) {
- fprintf( stderr, "[WARN] rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
+ rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
ep->notify = 0;
}
return FALSE;
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] link2 attempting connection with: %s\n", conn_info );
- if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
+ if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
- fprintf( stderr, "[WRN] rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) );
ep->notify = 0;
}
- //nng_close( *nn_sock );
return FALSE;
}
- if( DEBUG ) fprintf( stderr, "[INFO] rmr_si_link2: connection was successful to: %s\n", target );
+ if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
- fprintf( stderr, "[INFO] rmr: link2: connection finally establisehd with target: %s\n", target );
+ rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
ep->notify = 1;
}
return FALSE;
}
- uta_link2( ctx->si_ctx, ep );
+ uta_link2( ctx, ep );
return ep->open;
}
rrgroup_t* rrg; // pointer at group to update
if( ! rte || ! rt ) {
- fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+ rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
return NULL;
}
if( rte->nrrgroups <= group || group < 0 ) {
- fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+ rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
return NULL;
}
//fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] );
if( (rrg = rte->rrgroups[group]) == NULL ) {
if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
- fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+ rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
return NULL;
}
memset( rrg, 0, sizeof( *rrg ) );
if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
- fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+ rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
return NULL;
}
memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
rrg->nused = 0; // number populated
rrg->nendpts = MAX_EP_GROUP; // number allocated
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
}
ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
if( rrg != NULL ) {
if( rrg->nused >= rrg->nendpts ) {
// future: reallocate
- fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+ rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
return NULL;
}
rrg->nused++;
}
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
return ep;
}
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep;
- int state = FALSE;
-
- if( rt == NULL ) {
- return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+ route_table_t* rt;
+ si_ctx_t* si_ctx;
+ endpoint_t* ep;
+ int state = FALSE;
+
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
+ return FALSE;
+ }
+ } else {
+ rt = ctx->rtable; // faster but more risky
+ si_ctx = ctx->si_ctx;
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
if( uepp != NULL ) { // caller needs endpoint too, give it back
*uepp = ep;
}
if( ep == NULL ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
return FALSE;
}
}
if( ! ep->open ) { // not open -- connect now
- if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup
}
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
} else {
*nn_sock = ep->nn_sock;
state = TRUE;
Both of these, in the grand scheme of things, is minor compared to the
overhead of grabbing a lock on each call.
*/
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep; // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+ si_ctx_t* si_ctx;
+ endpoint_t* ep; // selected end point
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
int idx;
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ si_ctx = ctx->si_ctx;
+ }
+
//fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
if( ! more ) { // eliminate cheks each time we need to use
}
if( ! nn_sock ) { // user didn't supply a pointer
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr invalid nnsock pointer\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
errno = EINVAL;
*more = 0;
return FALSE;
}
if( rte == NULL ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr rte was nil; nothing selected\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
*more = 0;
return FALSE;
}
if( group < 0 || group >= rte->nrrgroups ) {
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] group out of range: group=%d max=%d\n", group, rte->nrrgroups );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
*more = 0;
return FALSE;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
*more = 0; // groups are inserted contig, so nothing should be after a nil pointer
return FALSE;
}
switch( rrg->nused ) {
case 0: // nothing allocated, just punt
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] nothing allocated for the rrg\n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
return FALSE;
case 1: // exactly one, no rr to deal with
ep = rrg->epts[0];
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with one choice in group \n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
state = TRUE;
break;
default: // need to pick one and adjust rr counts
idx = rrg->ep_idx++ % rrg->nused; // see note above
ep = rrg->epts[idx]; // select next endpoint
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
break;
}
}
if( state ) { // end point selected, open if not, get socket either way
if( ! ep->open ) { // not connected
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
} else {
state = FALSE;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
} else {
*nn_sock = ep->nn_sock;
}
}
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] epsock_rr returns state=%d\n", state );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
+ return state;
+}
+
+/*
+ Given a message, use the meid field to find the owner endpoint for the meid.
+ The owner ep is then used to extract the socket through which the message
+ is sent. This returns TRUE if we found a socket and it was written to the
+ nn_sock pointer; false if we didn't.
+
+ We've been told that the meid is a string, thus we count on it being a nil
+ terminated set of bytes.
+
+ If we return false the caller's ep reference may NOT be valid or even nil.
+*/
+static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
+ endpoint_t* ep; // seected end point
+ int state = FALSE; // processing state
+ char* meid;
+ si_ctx_t* si_ctx;
+
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ si_ctx = ctx->si_ctx;
+ }
+
+ errno = 0;
+ if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ meid = ((uta_mhdr_t *) msg->header)->meid;
+
+ ep = get_meid_owner( rtable, meid );
+ if( uepp != NULL ) { // caller needs refernce to endpoint too
+ *uepp = ep;
+ }
+
+ if( ep == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+ return FALSE;
+ }
+
+ state = TRUE;
+ if( ! ep->open ) { // not connected
+ if( ep->addr == NULL ) { // name didn't resolve before, try again
+ ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
+ }
+
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
+ ep->open = TRUE;
+ *nn_sock = ep->nn_sock; // pass socket back to caller
+ } else {
+ state = FALSE;
+ }
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+ } else {
+ *nn_sock = ep->nn_sock;
+ }
+
return state;
}
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
- no entry is found. If try_alternate is set, then we will attempt
+ no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
return rs;
}
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+ Create the hash which maps file descriptors to endpoints. We need this
+ to easily mark an endpoint as disconnected when we are notified. Thus we
+ expect these to be driven very seldomly; locking should not be an issue.
+ Locking is needed to prevent problems when the user application is multi-
+ threaded and attempting to (re)connect from concurrent threads.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+
+ if( ctx && ! ctx->fd2ep ) {
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+
+ if( ctx->fd2ep_gate == NULL ) {
+ ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+ if( ctx->fd2ep_gate != NULL ) {
+ pthread_mutex_init( ctx->fd2ep_gate, NULL );
+ }
+ }
+ }
+}
+
+/*
+ Add an entry into the fd2ep hash to map the FD to the endpoint.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+ if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
+ rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
+ }
+}
+
+/*
+ Given a file descriptor this fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+
+ This will also set the state on the ep open to false, and revoke the
+ FD (nn_socket).
+*/
+static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+ if( ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
+ rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
+ }
+ }
+
+ return ep;
+}
+
+/*
+ Given a file descriptor fetches the related endpoint from the hash.
+ Returns nil if there is no reference in the hash.
+*/
+static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
+ }
+
+ return ep;
+}
+
+
#endif