Add ability to control route table req frequency
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
index cf83590..719053a 100644 (file)
@@ -53,8 +53,8 @@
 */
 static void uta_ep_failed( endpoint_t* ep ) {
        if( ep != NULL ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] connection to %s was closed\n", ep->name );
-               ep->open = 0;
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
+               ep->open = FALSE;
        }
 }
 
@@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) {
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
        static int      flags = 0;
-
        char*           target;
        char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
        char*           addr;
@@ -79,14 +79,14 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        char*           tok;
 
        if( ep == NULL ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] link2 ep was nil!\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" );
                return FALSE;
        }
 
        target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
        if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
                if( ep->notify ) {
-                       fprintf( stderr, "[WARN] rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
+                       rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
                        ep->notify = 0;
                }
                return FALSE;
@@ -100,24 +100,24 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
 
        snprintf( conn_info, sizeof( conn_info ), "%s", target );
        errno = 0;
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] link2 attempting connection with: %s\n", conn_info );
-       if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
+       if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
                pthread_mutex_unlock( &ep->gate );
 
                if( ep->notify ) {                                                      // need to notify if set
-                       fprintf( stderr, "[WRN] rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
+                       rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
                        ep->notify = 0;
                }
-               //nng_close( *nn_sock );
                return FALSE;
        }
 
-       if( DEBUG ) fprintf( stderr, "[INFO] rmr_si_link2: connection was successful to: %s\n", target );
+       if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
 
        ep->open = TRUE;                                                // set open/notify before giving up lock
+       fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
 
        if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
-               fprintf( stderr, "[INFO] rmr: link2: connection finally establisehd with target: %s\n", target );
+               rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
                ep->notify = 1;
        }
 
@@ -144,7 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
                return FALSE;
        }
 
-       uta_link2( ctx->si_ctx, ep );
+       uta_link2( ctx, ep );
        return ep->open;
 }
 
@@ -161,25 +161,25 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        rrgroup_t* rrg;                         // pointer at group to update
 
        if( ! rte || ! rt ) {
-               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
                return NULL;
        }
 
        if( rte->nrrgroups <= group || group < 0 ) {
-               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
                return NULL;
        }
 
        //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
                        return NULL;
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
                if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
                        return NULL;
                }
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
@@ -191,7 +191,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                rrg->nused = 0;                                                 // number populated
                rrg->nendpts = MAX_EP_GROUP;                    // number allocated
 
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
        }
 
        ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
@@ -199,7 +199,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
                        // future: reallocate
-                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
                        return NULL;
                }
 
@@ -207,7 +207,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                rrg->nused++;
        }
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
        return ep;
 }
 
@@ -217,36 +217,46 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t* ep;
-       int state = FALSE;
-
-       if( rt == NULL ) {
-               return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+       route_table_t*  rt = NULL;
+       si_ctx_t*               si_ctx;
+       endpoint_t*             ep;
+       int                             state = FALSE;
+
+       if( PARANOID_CHECKS ) {
+               if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
+                       return FALSE;
+               }
+       } else {
+               rt = ctx->rtable;                               // faster but more risky
+               si_ctx = ctx->si_ctx;
        }
 
        ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
        if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
                *uepp = ep;
        }
        if( ep == NULL ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
                        return FALSE;
                }
        }
 
        if( ! ep->open )  {                                                                             // not open -- connect now
-               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                       fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
                }
-               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
        } else {
                *nn_sock = ep->nn_sock;
                state = TRUE;
@@ -281,13 +291,22 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t*     ep;                             // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+       si_ctx_t*               si_ctx;
+       endpoint_t*     ep;                             // selected end point
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
        int     idx;
 
+       if( PARANOID_CHECKS ) {
+               if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               si_ctx = ctx->si_ctx;
+       }
+
        //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
 
        if( ! more ) {                          // eliminate cheks each time we need to use
@@ -295,26 +314,26 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock,
        }
 
        if( ! nn_sock ) {                       // user didn't supply a pointer
-               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr invalid nnsock pointer\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" );
                errno = EINVAL;
                *more = 0;
                return FALSE;
        }
 
        if( rte == NULL ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr rte was nil; nothing selected\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" );
                *more = 0;
                return FALSE;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] group out of range: group=%d max=%d\n", group, rte->nrrgroups );
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups );
                *more = 0;
                return FALSE;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return FALSE;
        }
@@ -323,19 +342,19 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock,
 
        switch( rrg->nused ) {
                case 0:                         // nothing allocated, just punt
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] nothing allocated for the rrg\n" );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" );
                        return FALSE;
 
                case 1:                         // exactly one, no rr to deal with
                        ep = rrg->epts[0];
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with one choice in group \n" );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" );
                        state = TRUE;
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
                        idx = rrg->ep_idx++ % rrg->nused;                       // see note above
                        ep = rrg->epts[idx];                                            // select next endpoint
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
                        state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }
@@ -345,30 +364,94 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock,
        }
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
-                       if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                                ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                       if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                               fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
                        } else {
                                state = FALSE;
                        }
-                       if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
                } else {
                        *nn_sock = ep->nn_sock;
                }
        }
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] epsock_rr returns state=%d\n", state );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state );
+       return state;
+}
+
+/*
+       Given a message, use the meid field to find the owner endpoint for the meid.
+       The owner ep is then used to extract the socket through which the message
+       is sent. This returns TRUE if we found a socket and it was written to the
+       nn_sock pointer; false if we didn't.
+
+       We've been told that the meid is a string, thus we count on it being a nil
+       terminated set of bytes.
+
+       If we return false the caller's ep reference may NOT be valid or even nil.
+*/
+static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
+       endpoint_t*     ep;                             // seected end point
+       int     state = FALSE;                  // processing state
+       char*   meid;
+       si_ctx_t*       si_ctx;
+
+       if( PARANOID_CHECKS ) {
+               if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               si_ctx = ctx->si_ctx;
+       }
+
+       errno = 0;
+       if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
+               errno = EINVAL;
+               return FALSE;
+       }
+
+       meid = ((uta_mhdr_t *) msg->header)->meid;
+
+       ep = get_meid_owner( rtable, meid );
+       if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+               *uepp = ep;
+       }
+
+       if( ep == NULL ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+               return FALSE;
+       }
+
+       state = TRUE;
+       if( ! ep->open ) {                                                              // not connected
+               if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+               }
+
+               if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
+                       ep->open = TRUE;
+                       *nn_sock = ep->nn_sock;                                 // pass socket back to caller
+               } else {
+                       state = FALSE;
+               }
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+       } else {
+               *nn_sock = ep->nn_sock;
+       }
+
        return state;
 }
 
 /*
        Finds the rtable entry which matches the key. Returns a nil pointer if
-       no entry is found. If try_alternate is set, then we will attempt 
+       no entry is found. If try_alternate is set, then we will attempt
        to find the entry with a key based only on the message type.
 */
 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
@@ -423,4 +506,84 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
        return rs;
 }
 
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+       Create the hash which maps file descriptors to endpoints. We need this
+       to easily mark an endpoint as disconnected when we are notified. Thus we
+       expect these to be driven very seldomly; locking should not be an issue.
+       Locking is needed to prevent problems when the user application is multi-
+       threaded and attempting to (re)connect from concurrent threads.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+
+       if( ctx  && ! ctx->fd2ep ) {
+               ctx->fd2ep = rmr_sym_alloc( 129 );
+
+               if( ctx->fd2ep_gate == NULL ) {
+                       ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+                       if( ctx->fd2ep_gate != NULL ) {
+                               pthread_mutex_init( ctx->fd2ep_gate, NULL );
+                       }
+               }
+       }
+}
+
+/*
+       Add an entry into the fd2ep hash to map the FD to the endpoint.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+       if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
+               rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
+       }
+}
+
+/*
+       Given a file descriptor this fetches the related endpoint from the hash and
+       deletes the entry from the hash (when we detect a disconnect).
+
+       This will also set the state on the ep open to false, and revoke the
+       FD (nn_socket).
+*/
+static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+               if( ep ) {
+                       pthread_mutex_lock( ctx->fd2ep_gate );
+
+                       rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+
+                       pthread_mutex_unlock( ctx->fd2ep_gate );
+               }
+       }
+
+       return ep;
+}
+
+/*
+       Given a file descriptor fetches the related endpoint from the hash.
+       Returns nil if there is no reference in the hash.
+*/
+static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
+       }
+
+       return ep;
+}
+
+
 #endif