doc(log): Make err/cri/wrn tags consistent
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index 6122f69..875cfe6 100644 (file)
        Target assumed to be address:port.  The new socket is returned via the
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
+
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
+       get things into a bad state if we allow a collision here.  The lock grab
+       only happens on the intial session setup.
 */
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+       char*           target;
+       nng_socket*     nn_sock;
+       nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
 
+       if( ep == NULL ) {
+               return FALSE;
+       }
+
+       target = ep->addr;
+       nn_sock = &ep->nn_sock;
+       dialer = &ep->dialer;
+
        if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
                fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
                return FALSE;
@@ -69,14 +86,23 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
                return FALSE;
        }
 
+       pthread_mutex_lock( &ep->gate );                        // grab the lock
+       if( ep->open ) {
+               pthread_mutex_unlock( &ep->gate );
+               return TRUE;
+       }
+
+
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
                return FALSE;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
-               fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+               pthread_mutex_unlock( &ep->gate );
+               fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -85,13 +111,16 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
        if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
-               fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+               pthread_mutex_unlock( &ep->gate );
+               fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
        }
 
        if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
 
+       ep->open = TRUE;                                                // must set before release
+       pthread_mutex_unlock( &ep->gate );
        return TRUE;
 }
 
@@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) {
                return TRUE;
        }
 
-       ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+       uta_link2( ep );
        return ep->open;
 }
 
@@ -125,24 +154,24 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        rrgroup_t* rrg;                         // pointer at group to update
 
        if( ! rte || ! rt ) {
-               fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
                return NULL;
        }
 
        if( rte->nrrgroups <= group ) {
-               fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
                return NULL;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
                        return NULL;
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
                if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
                        return NULL;
                }
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
@@ -159,7 +188,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
                        // future: reallocate
-                       fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
                        return NULL;
                }
 
@@ -196,9 +225,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        if( ! ep->open )  {                                                                             // not open -- connect now
                if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                       ep->addr = uta_h2ip( ep->name );
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+               if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -230,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -237,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+       if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
 
@@ -287,22 +324,21 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
+
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
-                       state = rrg->ep_idx+1;
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }
 
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                               ep->addr = uta_h2ip( ep->name );
+                               ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+                       if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                        } else {