Code Review
/
ric-plt
/
lib
/
rmr.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Add ability to control route table req frequency
[ric-plt/lib/rmr.git]
/
src
/
rmr
/
si
/
src
/
rtable_si_static.c
diff --git
a/src/rmr/si/src/rtable_si_static.c
b/src/rmr/si/src/rtable_si_static.c
index
6215176
..
719053a
100644
(file)
--- a/
src/rmr/si/src/rtable_si_static.c
+++ b/
src/rmr/si/src/rtable_si_static.c
@@
-69,9
+69,9
@@
static void uta_ep_failed( endpoint_t* ep ) {
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
static int flags = 0;
static int flags = 0;
-
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
char* target;
char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection
char* addr;
@@
-101,7
+101,7
@@
static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
snprintf( conn_info, sizeof( conn_info ), "%s", target );
errno = 0;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
- if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+ if( (ep->nn_sock = SIconnect(
ctx->
si_ctx, conn_info )) < 0 ) {
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
pthread_mutex_unlock( &ep->gate );
if( ep->notify ) { // need to notify if set
@@
-114,6
+114,7
@@
static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
ep->open = TRUE; // set open/notify before giving up lock
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock)
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
if( ! ep->notify ) { // if we yammered about a failure, indicate finally good
rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
@@
-143,10
+144,7
@@
static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
return FALSE;
}
return FALSE;
}
- uta_link2( ctx->si_ctx, ep );
- if( ep->open ) {
- fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
- }
+ uta_link2( ctx, ep );
return ep->open;
}
return ep->open;
}
@@
-220,13
+218,14
@@
extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
endpoint cannot be found false (0) is returned.
*/
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
- route_table_t* rt
;
+ route_table_t* rt
= NULL;
si_ctx_t* si_ctx;
endpoint_t* ep;
int state = FALSE;
si_ctx_t* si_ctx;
endpoint_t* ep;
int state = FALSE;
- if( PAR
I
NOID_CHECKS ) {
+ if( PAR
A
NOID_CHECKS ) {
if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
return FALSE;
}
} else {
return FALSE;
}
} else {
@@
-235,6
+234,7
@@
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
if( uepp != NULL ) { // caller needs endpoint too, give it back
*uepp = ep;
}
if( uepp != NULL ) { // caller needs endpoint too, give it back
*uepp = ep;
}
@@
-250,7
+250,7
@@
static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2(
si_ctx, ep ) ) {
// find entry in table and create link
+ if( uta_link2(
ctx, ep ) ) {
// find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
@@
-299,7
+299,7
@@
static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
rrgroup_t* rrg;
int idx;
rrgroup_t* rrg;
int idx;
- if( PAR
I
NOID_CHECKS ) {
+ if( PAR
A
NOID_CHECKS ) {
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
@@
-369,7
+369,7
@@
static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2(
si_ctx, ep ) ) {
// find entry in table and create link
+ if( uta_link2(
ctx, ep ) ) {
// find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
@@
-394,6
+394,8
@@
static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
We've been told that the meid is a string, thus we count on it being a nil
terminated set of bytes.
We've been told that the meid is a string, thus we count on it being a nil
terminated set of bytes.
+
+ If we return false the caller's ep reference may NOT be valid or even nil.
*/
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep; // seected end point
*/
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep; // seected end point
@@
-401,7
+403,7
@@
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
char* meid;
si_ctx_t* si_ctx;
char* meid;
si_ctx_t* si_ctx;
- if( PAR
I
NOID_CHECKS ) {
+ if( PAR
A
NOID_CHECKS ) {
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
return FALSE;
}
@@
-417,11
+419,12
@@
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
meid = ((uta_mhdr_t *) msg->header)->meid;
meid = ((uta_mhdr_t *) msg->header)->meid;
- if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
-
if( uepp != NULL ) { // caller needs refernce to endpoint too
-
*uepp = NULL
;
-
}
+ ep = get_meid_owner( rtable, meid );
+ if( uepp != NULL ) { // caller needs refernce to endpoint too
+
*uepp = ep
;
+ }
+ if( ep == NULL ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
return FALSE;
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
return FALSE;
}
@@
-432,7
+435,7
@@
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
- if( uta_link2(
si_ctx, ep ) ) {
// find entry in table and create link
+ if( uta_link2(
ctx, ep ) ) {
// find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
@@
-448,7
+451,7
@@
static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
/*
Finds the rtable entry which matches the key. Returns a nil pointer if
- no entry is found. If try_alternate is set, then we will attempt
+ no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
to find the entry with a key based only on the message type.
*/
static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
@@
-508,26
+511,44
@@
static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
/*
Create the hash which maps file descriptors to endpoints. We need this
/*
Create the hash which maps file descriptors to endpoints. We need this
- to easily mark an endpoint as disconnected when we are notified.
+ to easily mark an endpoint as disconnected when we are notified. Thus we
+ expect these to be driven very seldomly; locking should not be an issue.
+ Locking is needed to prevent problems when the user application is multi-
+ threaded and attempting to (re)connect from concurrent threads.
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
*/
static void fd2ep_init( uta_ctx_t* ctx ) {
+
if( ctx && ! ctx->fd2ep ) {
if( ctx && ! ctx->fd2ep ) {
- ctx->fd2ep = rmr_sym_alloc( 129 );
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+
+ if( ctx->fd2ep_gate == NULL ) {
+ ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+ if( ctx->fd2ep_gate != NULL ) {
+ pthread_mutex_init( ctx->fd2ep_gate, NULL );
+ }
+ }
}
}
/*
}
}
/*
- Add an entry into the fd2ep hash
which points to the given ep
.
+ Add an entry into the fd2ep hash
to map the FD to the endpoint
.
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
*/
static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
/*
}
}
/*
- Given a file descriptor fetches the related endpoint from the hash and
- deletes the entry from the hash (when we detect a disconnect).
+ Given a file descriptor this fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+
+ This will also set the state on the ep open to false, and revoke the
+ FD (nn_socket).
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
*/
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
@@
-535,7
+556,11
@@
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
if( ctx && ctx->fd2ep ) {
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
if( ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
}
}
}
@@
-544,13
+569,17
@@
static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
/*
Given a file descriptor fetches the related endpoint from the hash.
/*
Given a file descriptor fetches the related endpoint from the hash.
- Returns nil if there is no
map
.
+ Returns nil if there is no
reference in the hash
.
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
*/
static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
endpoint_t* ep = NULL;
if( ctx && ctx->fd2ep ) {
+ pthread_mutex_lock( ctx->fd2ep_gate );
+
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+
+ pthread_mutex_unlock( ctx->fd2ep_gate );
}
return ep;
}
return ep;