if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
+
if( fd >= ctx->nrivers || fd < 0 ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
+ // FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
+ river->msg_size = *((int *) river->accum);
if( DEBUG > 1 ) {
rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
if( river->msg_size > 500 ) {
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) {
rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
}
}
}
/*
- Callback driven on a disconnect notification. We will attempt to find the related
- endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
remove it from the hash, and mark the endpoint as closed so that the next attempt
to send forces a reconnect attempt.
return SI_RET_OK;
}
- ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
- if( ep ) {
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
ep->open = FALSE;
ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
}
return SI_RET_OK;
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
return NULL;
}
+ // finish all flag setting before threads to keep helgrind quiet
+ ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
+
if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
} else {
}
}
- ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
static int flags = 0;
-
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
- if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+ if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
return FALSE;
}
- uta_link2( ctx->si_ctx, ep );
- if( ep->open ) {
- fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
- }
+ uta_link2( ctx, ep );
return ep->open;
}
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
- route_table_t* rt;
+ route_table_t* rt;
si_ctx_t* si_ctx;
endpoint_t* ep;
int state = FALSE;
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
- no entry is found. If try_alternate is set, then we will attempt
+ no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
/*
Create the hash which maps file descriptors to endpoints. We need this
- to easily mark an endpoint as disconnected when we are notified.
+ to easily mark an endpoint as disconnected when we are notified. Thus we
+ expect these to be driven very seldomly; locking should not be an issue.
+ Locking is needed to prevent problems when the user application is multi-
+ threaded and attempting to (re)connect from concurrent threads.
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
+
if( ctx && ! ctx->fd2ep ) {
- ctx->fd2ep = rmr_sym_alloc( 129 );
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+
+ if( ctx->fd2ep_gate == NULL ) {
+ ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+ if( ctx->fd2ep_gate != NULL ) {
+ pthread_mutex_init( ctx->fd2ep_gate, NULL );
+ }
+ }
}
}
/*
- Add an entry into the fd2ep hash which points to the given ep.
+ Add an entry into the fd2ep hash to map the FD to the endpoint.
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
/*
- Given a file descriptor fetches the related endpoint from the hash and
- deletes the entry from the hash (when we detect a disconnect).
+ Given a file descriptor this fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+
+ This will also set the state on the ep open to false, and revoke the
+ FD (nn_socket).
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
/*
Given a file descriptor fetches the related endpoint from the hash.
- Returns nil if there is no map.
+ Returns nil if there is no reference in the hash.
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
return ep;