fix(rtable): Prevent direct send hairpins
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index 6122f69..fefa63d 100644 (file)
        Target assumed to be address:port.  The new socket is returned via the
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
+
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
+       get things into a bad state if we allow a collision here.  The lock grab
+       only happens on the intial session setup.
 */
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+       char*           target;
+       nng_socket*     nn_sock;
+       nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
 
+       if( ep == NULL ) {
+               return FALSE;
+       }
+
+       target = ep->addr;
+       nn_sock = &ep->nn_sock;
+       dialer = &ep->dialer;
+
        if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
                fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
                return FALSE;
@@ -69,13 +86,22 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
                return FALSE;
        }
 
+       pthread_mutex_lock( &ep->gate );                        // grab the lock
+       if( ep->open ) {
+               pthread_mutex_unlock( &ep->gate );
+               return TRUE;
+       }
+
+
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
                return FALSE;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
@@ -85,6 +111,7 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
        if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
@@ -92,6 +119,8 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
 
        if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
 
+       ep->open = TRUE;                                                // must set before release
+       pthread_mutex_unlock( &ep->gate );
        return TRUE;
 }
 
@@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) {
                return TRUE;
        }
 
-       ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+       uta_link2( ep );
        return ep->open;
 }
 
@@ -196,9 +225,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        if( ! ep->open )  {                                                                             // not open -- connect now
                if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                       ep->addr = uta_h2ip( ep->name );
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+               if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -230,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -237,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+       if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
 
@@ -287,22 +324,21 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
+
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
-                       state = rrg->ep_idx+1;
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }
 
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                               ep->addr = uta_h2ip( ep->name );
+                               ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+                       if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                        } else {