-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
Copyright (c) 2019 Nokia
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
static int uta_link2( endpoint_t* ep ) {
+ static int flags = -1;
+
char* target;
nng_socket* nn_sock;
nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
int state = FALSE;
+ char* tok;
if( ep == NULL ) {
return FALSE;
}
- target = ep->addr;
+ if( flags < 0 ) {
+ tok = getenv( "RMR_ASYNC_CONN" );
+ if( tok == NULL || *tok == '1' ) {
+ flags = NNG_FLAG_NONBLOCK; // start dialer asynch
+ } else {
+ flags = NO_FLAGS;
+ }
+ }
+
+ target = ep->name; // always give name to transport so chaning dest IP does not break reconnect
nn_sock = &ep->nn_sock;
dialer = &ep->dialer;
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
pthread_mutex_unlock( &ep->gate );
- fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
+ rmr_vlog( RMR_VL_CRIT, "rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
return FALSE;
}
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
pthread_mutex_unlock( &ep->gate );
- fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+ rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
nng_close( *nn_sock );
return FALSE;
}
nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s)
nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
- if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
+ if( (state = nng_dialer_start( *dialer, flags )) != 0 ) { // can fail immediatly (unlike nanomsg)
pthread_mutex_unlock( &ep->gate );
- fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+ rmr_vlog( RMR_VL_WARN, "rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
nng_close( *nn_sock );
return FALSE;
}
- if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
+ if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_link2l: dial was successful: %s\n", target );
ep->open = TRUE; // must set before release
pthread_mutex_unlock( &ep->gate );
/*
This provides a protocol independent mechanism for establishing the connection to an endpoint.
Return is true (1) if the link was opened; false on error.
+
+ For some flavours, the context is needed by this function, but not for nng.
*/
-static int rt_link2_ep( endpoint_t* ep ) {
+static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
if( ep == NULL ) {
return FALSE;
}
rrgroup_t* rrg; // pointer at group to update
if( ! rte || ! rt ) {
- fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+ rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
return NULL;
}
if( rte->nrrgroups <= group ) {
- fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+ rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
return NULL;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
- fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+ rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
return NULL;
}
memset( rrg, 0, sizeof( *rrg ) );
if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
- fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+ rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
return NULL;
}
memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
if( rrg != NULL ) {
if( rrg->nused >= rrg->nendpts ) {
// future: reallocate
- fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+ rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
return NULL;
}
rrg->nused++;
}
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
return ep;
}
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep;
int state = FALSE;
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
+ if( uepp != NULL ) { // caller needs endpoint too, give it back
+ *uepp = ep;
+ }
if( ep == NULL ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
return FALSE;
}
}
if( ! ep->open ) { // not open -- connect now
- if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
}
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
} else {
*nn_sock = ep->nn_sock;
state = TRUE;
/*
Make a round robin selection within a round robin group for a route table
entry. Returns the nanomsg socket if there is a rte for the message
- type, and group is defined. Socket is returned via pointer in the parm
+ key, and group is defined. Socket is returned via pointer in the parm
list (nn_sock).
The group is the group number to select from.
Both of these, in the grand scheme of things, is minor compared to the
overhead of grabbing a lock on each call.
*/
-static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
- rtable_ent_t* rte; // matching rt entry
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep; // seected end point
int state = FALSE; // processing state
int dummy;
return FALSE;
}
- if( rt == NULL ) {
- *more = 0;
- return FALSE;
- }
-
- if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
+ if( rte == NULL ) {
*more = 0;
- //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
return FALSE;
}
if( group < 0 || group >= rte->nrrgroups ) {
- //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
+ //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
*more = 0;
return FALSE;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
- //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
+ //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
*more = 0; // groups are inserted contig, so nothing should be after a nil pointer
return FALSE;
}
//if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
return FALSE;
- case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1
- //*nn_sock = rrg->epts[0]->nn_sock;
+ case 1: // exactly one, no rr to deal with
ep = rrg->epts[0];
//if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
state = TRUE;
break;
}
+ if( uepp != NULL ) { // caller needs refernce to endpoint too
+ *uepp = ep;
+ }
if( state ) { // end point selected, open if not, get socket either way
if( ! ep->open ) { // not connected
if( ep->addr == NULL ) { // name didn't resolve before, try again
} else {
state = FALSE;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
} else {
*nn_sock = ep->nn_sock;
}
return state;
}
+/*
+ Finds the rtable entry which matches the key. Returns a nil pointer if
+ no entry is found. If try_alternate is set, then we will attempt
+ to find the entry with a key based only on the message type.
+*/
+static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
+ uint64_t key; // key is sub id and mtype banged together
+ rtable_ent_t* rte; // the entry we found
+
+ if( rt == NULL || rt->hash == NULL ) {
+ return NULL;
+ }
+
+ key = build_rt_key( sid, mtype ); // first try with a 'full' key
+ if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have
+ return rte;
+ }
+
+ if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set
+ key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key
+ rte = rmr_sym_pull( rt->hash, key ); // see what we get with this
+ }
+
+ return rte;
+}
+
+/*
+ Return a string of count information. E.g.:
+ <ep-name>:<port> <good> <hard-fail> <soft-fail>
+
+ Caller must free the string allocated if a buffer was not provided.
+
+ Pointer returned is to a freshly allocated string, or the user buffer
+ for convenience.
+
+ If the endpoint passed is a nil pointer, then we return a nil -- caller
+ must check!
+*/
+static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
+ char* rs; // result string
+
+ if( ep == NULL ) {
+ return NULL;
+ }
+
+ if( ubuf != NULL ) {
+ rs = ubuf;
+ } else {
+ ubuf_len = 256;
+ rs = malloc( sizeof( char ) * ubuf_len );
+ }
+
+ snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
+
+ return rs;
+}
+
+/*
+ Given a message, use the meid field to find the owner endpoint for the meid.
+ The owner ep is then used to extract the socket through which the message
+ is sent. This returns TRUE if we found a socket and it was written to the
+ nn_sock pointer; false if we didn't.
+
+ We've been told that the meid is a string, thus we count on it being a nil
+ terminated set of bytes.
+
+ If we return false there is no guarentee that the caller's reference to the
+ ep is valid or nil. Caller can trus the ep reference only when the return is
+ true.
+*/
+static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) {
+ endpoint_t* ep; // seected end point
+ int state = FALSE; // processing state
+ char* meid;
+
+ errno = 0;
+ if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ meid = ((uta_mhdr_t *) msg->header)->meid;
+
+ if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+ return FALSE;
+ }
+
+ if( uepp != NULL ) { // ensure ep is returned to the caller
+ *uepp = ep;
+ }
+
+ state = TRUE;
+ if( ! ep->open ) { // not connected
+ if( ep->addr == NULL ) { // name didn't resolve before, try again
+ ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
+ }
+
+ if( uta_link2( ep ) ) { // find entry in table and create link
+ ep->open = TRUE;
+ *nn_sock = ep->nn_sock; // pass socket back to caller
+ } else {
+ state = FALSE;
+ }
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+ } else {
+ *nn_sock = ep->nn_sock;
+ }
+
+ return state;
+}
+
#endif