Target assumed to be address:port. The new socket is returned via the
user supplied pointer so that a success/fail code is returned directly.
Return value is 0 (false) on failure, 1 (true) on success.
+
+ In order to support multi-threaded user applications we must hold a lock before
+ we attempt to create a dialer and connect. NNG is thread safe, but we can
+ get things into a bad state if we allow a collision here. The lock grab
+ only happens on the intial session setup.
*/
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+ char* target;
+ nng_socket* nn_sock;
+ nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
int state = FALSE;
+ if( ep == NULL ) {
+ return FALSE;
+ }
+
+ target = ep->addr;
+ nn_sock = &ep->nn_sock;
+ dialer = &ep->dialer;
+
if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
return FALSE;
return FALSE;
}
+ pthread_mutex_lock( &ep->gate ); // grab the lock
+ if( ep->open ) {
+ pthread_mutex_unlock( &ep->gate );
+ return TRUE;
+ }
+
+
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
return FALSE;
}
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
nng_close( *nn_sock );
return FALSE;
nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
nng_close( *nn_sock );
return FALSE;
if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
+ ep->open = TRUE; // must set before release
+ pthread_mutex_unlock( &ep->gate );
return TRUE;
}
return TRUE;
}
- ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+ uta_link2( ep );
return ep->open;
}
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = uta_h2ip( ep->name );
}
- if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
+ if( uta_link2( ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
ep->addr = uta_h2ip( ep->name );
}
- if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
+ if( uta_link2( ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {