Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I4c3d9d4191d0c994246c076861b52973413404a6
river_t* rivers; // inbound flows (index is the socket fd)
int max_ibm; // max size of an inbound message (river accum alloc size)
void* zcb_mring; // zero copy buffer mbuf ring
river_t* rivers; // inbound flows (index is the socket fd)
int max_ibm; // max size of an inbound message (river accum alloc size)
void* zcb_mring; // zero copy buffer mbuf ring
- void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect
+ void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect
+ pthread_mutex_t *fd2ep_gate; // we must gate add/deletes to the fd2 symtab
};
typedef uta_ctx_t uta_ctx;
};
typedef uta_ctx_t uta_ctx;
// --- rt table things ---------------------------
static void uta_ep_failed( endpoint_t* ep );
// --- rt table things ---------------------------
static void uta_ep_failed( endpoint_t* ep );
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep );
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep );
+
static int rt_link2_ep( void* vctx, endpoint_t* ep );
static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
static inline int xlate_si_state( int state, int def_state );
static int rt_link2_ep( void* vctx, endpoint_t* ep );
static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
static inline int xlate_si_state( int state, int def_state );
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
if( fd >= ctx->nrivers || fd < 0 ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
if( fd >= ctx->nrivers || fd < 0 ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
+ // FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
river->ipt += need;
bidx += need;
remain -= need;
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
+ river->msg_size = *((int *) river->accum);
if( DEBUG > 1 ) {
rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
if( river->msg_size > 500 ) {
if( DEBUG > 1 ) {
rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
if( river->msg_size > 500 ) {
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) {
rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- Callback driven on a disconnect notification. We will attempt to find the related
- endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
remove it from the hash, and mark the endpoint as closed so that the next attempt
to send forces a reconnect attempt.
remove it from the hash, and mark the endpoint as closed so that the next attempt
to send forces a reconnect attempt.
- ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
- if( ep ) {
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
ep->open = FALSE;
ep->nn_sock = -1;
ep->open = FALSE;
ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
+ // finish all flag setting before threads to keep helgrind quiet
+ ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
+
if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
} else {
if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
} else {
- ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
- if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+ if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
- uta_link2( ctx->si_ctx, ep );
- if( ep->open ) {
- fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
- }
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
si_ctx_t* si_ctx;
endpoint_t* ep;
int state = FALSE;
si_ctx_t* si_ctx;
endpoint_t* ep;
int state = FALSE;
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ if( uta_link2( ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
- no entry is found. If try_alternate is set, then we will attempt
+ no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
/*
Create the hash which maps file descriptors to endpoints. We need this
/*
Create the hash which maps file descriptors to endpoints. We need this
- to easily mark an endpoint as disconnected when we are notified.
+ to easily mark an endpoint as disconnected when we are notified. Thus we
+ expect these to be driven very seldomly; locking should not be an issue.
+ Locking is needed to prevent problems when the user application is multi-
+ threaded and attempting to (re)connect from concurrent threads.
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
if( ctx && ! ctx->fd2ep ) {
if( ctx && ! ctx->fd2ep ) {
- ctx->fd2ep = rmr_sym_alloc( 129 );
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+
+ if( ctx->fd2ep_gate == NULL ) {
+ ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+ if( ctx->fd2ep_gate != NULL ) {
+ pthread_mutex_init( ctx->fd2ep_gate, NULL );
+ }
+ }
- Add an entry into the fd2ep hash which points to the given ep.
+ Add an entry into the fd2ep hash to map the FD to the endpoint.
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
- Given a file descriptor fetches the related endpoint from the hash and
- deletes the entry from the hash (when we detect a disconnect).
+ Given a file descriptor this fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+
+ This will also set the state on the ep open to false, and revoke the
+ FD (nn_socket).
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
/*
Given a file descriptor fetches the related endpoint from the hash.
/*
Given a file descriptor fetches the related endpoint from the hash.
- Returns nil if there is no map.
+ Returns nil if there is no reference in the hash.
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );