Add transport provider status to message buffer
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index d903de3..875cfe6 100644 (file)
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
 
-       In order to support multi-threaded user applications we must hold a lock before 
-       we attempt to create a dialer and connect. NNG is thread safe, but we can 
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
-       char*           target; 
-       nng_socket*     nn_sock; 
+       char*           target;
+       nng_socket*     nn_sock;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
@@ -91,7 +91,7 @@ static int uta_link2( endpoint_t* ep ) {
                pthread_mutex_unlock( &ep->gate );
                return TRUE;
        }
-       
+
 
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
                pthread_mutex_unlock( &ep->gate );
@@ -102,7 +102,7 @@ static int uta_link2( endpoint_t* ep ) {
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -112,7 +112,7 @@ static int uta_link2( endpoint_t* ep ) {
 
        if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+               fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -154,24 +154,24 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        rrgroup_t* rrg;                         // pointer at group to update
 
        if( ! rte || ! rt ) {
-               fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
                return NULL;
        }
 
        if( rte->nrrgroups <= group ) {
-               fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
                return NULL;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
                        return NULL;
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
                if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
                        return NULL;
                }
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
@@ -188,7 +188,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
                        // future: reallocate
-                       fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
                        return NULL;
                }
 
@@ -225,7 +225,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        if( ! ep->open )  {                                                                             // not open -- connect now
                if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                       ep->addr = uta_h2ip( ep->name );
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
                if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                        state = TRUE;
@@ -259,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -266,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+       if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
 
@@ -316,19 +324,18 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
+
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
-                       state = rrg->ep_idx+1;
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }
 
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
-                               ep->addr = uta_h2ip( ep->name );
+                               ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
                        if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link