Allow endpoint selection based on meid in message
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index da4101b..81e79ca 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
+       static int      flags = -1;
+
        char*           target;
        nng_socket*     nn_sock;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
+       char*           tok;
 
        if( ep == NULL ) {
                return FALSE;
        }
 
-       target = ep->addr;
+       if( flags < 0 ) {
+               tok = getenv( "RMR_ASYNC_CONN" );
+               if( tok == NULL || *tok == '1' ) {
+                       flags = NNG_FLAG_NONBLOCK;                              // start dialer asynch
+               } else {
+                       flags = NO_FLAGS;
+               }
+       }
+
+       target = ep->name;                              // always give name to transport so chaning dest IP does not break reconnect
        nn_sock = &ep->nn_sock;
        dialer = &ep->dialer;
 
@@ -110,7 +121,7 @@ static int uta_link2( endpoint_t* ep ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
-       if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
+       if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
                pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
@@ -206,7 +217,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
        endpoint_t* ep;
        int state = FALSE;
 
@@ -215,6 +226,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        }
 
        ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
+               *uepp = ep;
+       }
        if( ep == NULL ) {
                if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
@@ -267,8 +281,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
-       //rtable_ent_t* rte;                    // matching rt entry
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
        int dummy;
@@ -325,6 +338,9 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n
                        break;
        }
 
+       if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+               *uepp = ep;
+       }
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
@@ -372,4 +388,101 @@ static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype,
        return rte;
 }
 
+/*
+       Given a route table and meid string, find the owner (if known). Returns a pointer to
+       the endpoint struct or nil.
+*/
+static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
+       endpoint_t* ep;         // the ep we found in the hash
+
+       if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+               return NULL;
+       }
+
+       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
+}
+
+/*
+       Return a string of count information. E.g.:
+               <ep-name>:<port> <good> <hard-fail> <soft-fail>
+
+       Caller must free the string allocated if a buffer was not provided.
+
+       Pointer returned is to a freshly allocated string, or the user buffer
+       for convenience.
+
+       If the endpoint passed is a nil pointer, then we return a nil -- caller
+       must check!
+*/
+static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
+       char*   rs;                     // result string
+
+       if( ep == NULL ) {
+               return NULL;
+       }
+
+       if( ubuf != NULL ) {
+               rs = ubuf;
+       } else {
+               ubuf_len = 256;
+               rs = malloc( sizeof( char ) * ubuf_len );
+       }
+
+       snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
+
+       return rs;
+}
+
+/*
+       Given a message, use the meid field to find the owner endpoint for the meid.
+       The owner ep is then used to extract the socket through which the message
+       is sent. This returns TRUE if we found a socket and it was written to the
+       nn_sock pointer; false if we didn't.
+
+       We've been told that the meid is a string, thus we count on it being a nil
+       terminated set of bytes.
+*/
+static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) {
+       endpoint_t*     ep;                             // seected end point
+       int     state = FALSE;                  // processing state
+       char*   meid;
+
+
+       errno = 0;
+       if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
+               errno = EINVAL;
+               return FALSE;
+       }
+
+       meid = ((uta_mhdr_t *) msg->header)->meid;
+
+       if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
+               if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+                       *uepp = NULL;
+               }
+
+               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_meid: no ep in hash for (%s)\n", meid );
+               return FALSE;
+       }
+
+       state = TRUE;
+       if( ! ep->open ) {                                                              // not connected
+               if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+               }
+
+               if( uta_link2( ep ) ) {                                         // find entry in table and create link
+                       ep->open = TRUE;
+                       *nn_sock = ep->nn_sock;                                 // pass socket back to caller
+               } else {
+                       state = FALSE;
+               }
+               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+       } else {
+               *nn_sock = ep->nn_sock;
+       }
+
+       return state;
+}
+
 #endif