user supplied pointer so that a success/fail code is returned directly.
Return value is 0 (false) on failure, 1 (true) on success.
- In order to support multi-threaded user applications we must hold a lock before
- we attempt to create a dialer and connect. NNG is thread safe, but we can
+ In order to support multi-threaded user applications we must hold a lock before
+ we attempt to create a dialer and connect. NNG is thread safe, but we can
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
static int uta_link2( endpoint_t* ep ) {
- char* target;
- nng_socket* nn_sock;
+ char* target;
+ nng_socket* nn_sock;
nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
pthread_mutex_unlock( &ep->gate );
return TRUE;
}
-
+
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
pthread_mutex_unlock( &ep->gate );
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
pthread_mutex_unlock( &ep->gate );
- fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+ fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
nng_close( *nn_sock );
return FALSE;
}
if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
pthread_mutex_unlock( &ep->gate );
- fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+ fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
nng_close( *nn_sock );
return FALSE;
}
rrgroup_t* rrg; // pointer at group to update
if( ! rte || ! rt ) {
- fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+ fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
return NULL;
}
if( rte->nrrgroups <= group ) {
- fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+ fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
return NULL;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
- fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+ fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
return NULL;
}
memset( rrg, 0, sizeof( *rrg ) );
if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
- fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+ fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
return NULL;
}
memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
if( rrg != NULL ) {
if( rrg->nused >= rrg->nendpts ) {
// future: reallocate
- fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+ fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
return NULL;
}
if( ! ep->open ) { // not open -- connect now
if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
if( ep->addr == NULL ) { // name didn't resolve before, try again
- ep->addr = uta_h2ip( ep->name );
+ ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
if( uta_link2( ep ) ) { // find entry in table and create link
state = TRUE;
We return the index+1 from the round robin table on success so that we can verify
during test that different entries are being seleted; we cannot depend on the nng
socket being different as we could with nano.
+
+ NOTE: The round robin selection index increment might collide with other
+ threads if multiple threads are attempting to send to the same round
+ robin group; the consequences are small and avoid locking. The only side
+ effect is either sending two messages in a row to, or skipping, an endpoint.
+ Both of these, in the grand scheme of things, is minor compared to the
+ overhead of grabbing a lock on each call.
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
rtable_ent_t* rte; // matching rt entry
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
+ int idx;
- if( ! more ) { // eliminate cheks each time we need to user
+ if( ! more ) { // eliminate cheks each time we need to use
more = &dummy;
}
break;
default: // need to pick one and adjust rr counts
- ep = rrg->epts[rrg->ep_idx++]; // select next endpoint
+
+ idx = rrg->ep_idx++ % rrg->nused; // see note above
+ ep = rrg->epts[idx]; // select next endpoint
//if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
- if( rrg->ep_idx >= rrg->nused ) {
- rrg->ep_idx = 0;
- }
- state = rrg->ep_idx+1;
+ state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
break;
}
if( state ) { // end point selected, open if not, get socket either way
if( ! ep->open ) { // not connected
if( ep->addr == NULL ) { // name didn't resolve before, try again
- ep->addr = uta_h2ip( ep->name );
+ ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
}
if( uta_link2( ep ) ) { // find entry in table and create link