Correct bug in mt call mode receive with timeout
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index fefa63d..4f413be 100644 (file)
 */
 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
+       static int      flags = -1;
+
        char*           target;
        nng_socket*     nn_sock;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
+       char*           tok;
 
        if( ep == NULL ) {
                return FALSE;
        }
 
-       target = ep->addr;
+       if( flags < 0 ) {
+               tok = getenv( "RMR_ASYNC_CONN" );
+               if( tok == NULL || *tok == '1' ) {
+                       flags = NNG_FLAG_NONBLOCK;                              // start dialer asynch
+               } else {
+                       flags = NO_FLAGS;
+               }
+       }
+
+       target = ep->name;                              // always give name to transport so chaning dest IP does not break reconnect
        nn_sock = &ep->nn_sock;
        dialer = &ep->dialer;
 
@@ -102,7 +114,7 @@ static int uta_link2( endpoint_t* ep ) {
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -110,9 +122,9 @@ static int uta_link2( endpoint_t* ep ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
-       if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
+       if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+               fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -154,24 +166,24 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        rrgroup_t* rrg;                         // pointer at group to update
 
        if( ! rte || ! rt ) {
-               fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
                return NULL;
        }
 
        if( rte->nrrgroups <= group ) {
-               fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
                return NULL;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
                        return NULL;
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
                if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
-                       fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
                        return NULL;
                }
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
@@ -188,7 +200,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
                        // future: reallocate
-                       fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
                        return NULL;
                }
 
@@ -244,7 +256,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
 /*
        Make a round robin selection within a round robin group for a route table
        entry. Returns the nanomsg socket if there is a rte for the message
-       type, and group is defined. Socket is returned via pointer in the parm
+       key, and group is defined. Socket is returned via pointer in the parm
        list (nn_sock).
 
        The group is the group number to select from.
@@ -267,8 +279,8 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
-       rtable_ent_t* rte;                      // matching rt entry
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
+       //rtable_ent_t* rte;                    // matching rt entry
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
        int dummy;
@@ -286,25 +298,19 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                return FALSE;
        }
 
-       if( rt == NULL ) {
+       if( rte == NULL ) {
                *more = 0;
                return FALSE;
        }
 
-       if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
-               *more = 0;
-               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
-               return FALSE;
-       }
-
        if( group < 0 || group >= rte->nrrgroups ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
+               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
                *more = 0;
                return FALSE;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
+               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return FALSE;
        }
@@ -316,8 +322,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
                        return FALSE;
 
-               case 1:                         // exactly one, no rr to deal with and more is not possible even if fanout > 1
-                       //*nn_sock = rrg->epts[0]->nn_sock;
+               case 1:                         // exactly one, no rr to deal with
                        ep = rrg->epts[0];
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
                        state = TRUE;
@@ -353,4 +358,30 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        return state;
 }
 
+/*
+       Finds the rtable entry which matches the key. Returns a nil pointer if
+       no entry is found. If try_alternate is set, then we will attempt 
+       to find the entry with a key based only on the message type.
+*/
+static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
+       uint64_t key;                   // key is sub id and mtype banged together
+       rtable_ent_t* rte;              // the entry we found
+
+       if( rt == NULL || rt->hash == NULL ) {
+               return NULL;
+       }
+
+       key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
+       if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
+               return rte;
+       }
+
+       if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
+               key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
+               rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
+       }
+
+       return rte;
+}
+
 #endif