enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index 6122f69..d903de3 100644 (file)
        Target assumed to be address:port.  The new socket is returned via the
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
+
+       In order to support multi-threaded user applications we must hold a lock before 
+       we attempt to create a dialer and connect. NNG is thread safe, but we can 
+       get things into a bad state if we allow a collision here.  The lock grab
+       only happens on the intial session setup.
 */
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+       char*           target; 
+       nng_socket*     nn_sock; 
+       nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
 
+       if( ep == NULL ) {
+               return FALSE;
+       }
+
+       target = ep->addr;
+       nn_sock = &ep->nn_sock;
+       dialer = &ep->dialer;
+
        if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
                fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
                return FALSE;
@@ -69,13 +86,22 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
                return FALSE;
        }
 
+       pthread_mutex_lock( &ep->gate );                        // grab the lock
+       if( ep->open ) {
+               pthread_mutex_unlock( &ep->gate );
+               return TRUE;
+       }
+       
+
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
                return FALSE;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
@@ -85,6 +111,7 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
        if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
@@ -92,6 +119,8 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
 
        if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
 
+       ep->open = TRUE;                                                // must set before release
+       pthread_mutex_unlock( &ep->gate );
        return TRUE;
 }
 
@@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) {
                return TRUE;
        }
 
-       ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+       uta_link2( ep );
        return ep->open;
 }
 
@@ -198,7 +227,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = uta_h2ip( ep->name );
                }
-               if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+               if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -302,7 +331,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                                ep->addr = uta_h2ip( ep->name );
                        }
 
-                       if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+                       if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                        } else {