Fixing handling of invalid header size
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
index 6215176..f7cddf5 100644 (file)
@@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) {
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
        static int      flags = 0;
-
        char*           target;
        char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
        char*           addr;
@@ -101,7 +101,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        snprintf( conn_info, sizeof( conn_info ), "%s", target );
        errno = 0;
        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
-       if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+       if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
                pthread_mutex_unlock( &ep->gate );
 
                if( ep->notify ) {                                                      // need to notify if set
@@ -114,6 +114,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
 
        ep->open = TRUE;                                                // set open/notify before giving up lock
+       fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
 
        if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
                rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
@@ -143,10 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
                return FALSE;
        }
 
-       uta_link2( ctx->si_ctx, ep );
-       if( ep->open ) {
-               fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
-       }
+       uta_link2( ctx, ep );
        return ep->open;
 }
 
@@ -172,7 +170,6 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                return NULL;
        }
 
-       //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
@@ -180,14 +177,14 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
-               if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
+               if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t* ) * MAX_EP_GROUP )) == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       free( rrg );
                        return NULL;
                }
-               memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
+               memset( rrg->epts, 0, sizeof( endpoint_t* ) * MAX_EP_GROUP );
 
                rte->rrgroups[group] = rrg;
-               //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
 
                rrg->ep_idx = 0;                                                // next endpoint to send to
                rrg->nused = 0;                                                 // number populated
@@ -196,7 +193,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
        }
 
-       ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
+       ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
 
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
@@ -215,42 +212,54 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
 
 
 /*
-       Given a name, find the nano socket needed to send to it. Returns the socket via
+       Given a name, find the socket fd needed to send to it. Returns the socket via
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
-       route_table_t*  rt
-       si_ctx_t*               si_ctx;
+       route_table_t*  rt = NULL;
+       si_ctx_t*               si_ctx = NULL;
        endpoint_t*             ep;
        int                             state = FALSE;
 
-       if( PARINOID_CHECKS ) {
-               if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+       if( PARANOID_CHECKS ) {
+               if( ctx == NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop ctx=%p rt=%p\n", ctx, rt );
+                       return FALSE;
+               }
+               if( (si_ctx = ctx->si_ctx) == NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop sictx is nil\n" );
+                       return FALSE;
+               }
+               if( (rt = get_rt( ctx )) == NULL ) {                            // get active rt and bump ref count
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: paranoia check pop no rtable\n" );
                        return FALSE;
                }
        } else {
-               rt = ctx->rtable;                               // faster but more risky
+               rt = get_rt( ctx );                             // get active rt and bump ref count
                si_ctx = ctx->si_ctx;
        }
 
-       ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       ep =  rmr_sym_get( rt->ephash, ep_name, 1 );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
        if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
                *uepp = ep;
        }
        if( ep == NULL ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
+                       release_rt( ctx, rt );                                                  // drop ref count
                        return FALSE;
                }
        }
+       release_rt( ctx, rt );                                                                          // drop ref count
 
        if( ! ep->open )  {                                                                             // not open -- connect now
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -267,7 +276,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
 
 /*
        Make a round robin selection within a round robin group for a route table
-       entry. Returns the nanomsg socket if there is a rte for the message
+       entry. Returns the socket fd if there is a rte for the message
        key, and group is defined. Socket is returned via pointer in the parm
        list (nn_sock).
 
@@ -281,8 +290,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
        The return value is true (>0) if the socket was found and *nn_sock was updated
        and false (0) if there is no associated socket for the msg type, group combination.
        We return the index+1 from the round robin table on success so that we can verify
-       during test that different entries are being seleted; we cannot depend on the nng
-       socket being different as we could with nano.
+       during test that different entries are being seleted.
 
        NOTE:   The round robin selection index increment might collide with other
                threads if multiple threads are attempting to send to the same round
@@ -299,7 +307,7 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
        rrgroup_t* rrg;
        int     idx;
 
-       if( PARINOID_CHECKS ) {
+       if( PARANOID_CHECKS ) {
                if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
                        return FALSE;
                }
@@ -307,8 +315,6 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
                si_ctx = ctx->si_ctx;
        }
 
-       //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
-
        if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
@@ -369,7 +375,7 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
                                ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                       if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                                fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
@@ -394,6 +400,8 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
 
        We've been told that the meid is a string, thus we count on it being a nil
        terminated set of bytes.
+
+       If we return false the caller's ep reference may NOT be valid or even nil.
 */
 static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
        endpoint_t*     ep;                             // seected end point
@@ -401,7 +409,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
        char*   meid;
        si_ctx_t*       si_ctx;
 
-       if( PARINOID_CHECKS ) {
+       if( PARANOID_CHECKS ) {
                if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
                        return FALSE;
                }
@@ -417,11 +425,12 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
 
        meid = ((uta_mhdr_t *) msg->header)->meid;
 
-       if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
-               if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
-                       *uepp = NULL;
-               }
+       ep = get_meid_owner( rtable, meid );
+       if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+               *uepp = ep;
+       }
 
+       if( ep == NULL ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
                return FALSE;
        }
@@ -432,7 +441,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
 
-               if( uta_link2( si_ctx, ep ) ) {                         // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                 // pass socket back to caller
                } else {
@@ -448,7 +457,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
 
 /*
        Finds the rtable entry which matches the key. Returns a nil pointer if
-       no entry is found. If try_alternate is set, then we will attempt 
+       no entry is found. If try_alternate is set, then we will attempt
        to find the entry with a key based only on the message type.
 */
 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
@@ -508,26 +517,44 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
 
 /*
        Create the hash which maps file descriptors to endpoints. We need this
-       to easily mark an endpoint as disconnected when we are notified.
+       to easily mark an endpoint as disconnected when we are notified. Thus we
+       expect these to be driven very seldomly; locking should not be an issue.
+       Locking is needed to prevent problems when the user application is multi-
+       threaded and attempting to (re)connect from concurrent threads.
 */
 static void fd2ep_init( uta_ctx_t* ctx ) {
+
        if( ctx  && ! ctx->fd2ep ) {
-               ctx->fd2ep = rmr_sym_alloc( 129 );              
+               ctx->fd2ep = rmr_sym_alloc( 129 );
+
+               if( ctx->fd2ep_gate == NULL ) {
+                       ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+                       if( ctx->fd2ep_gate != NULL ) {
+                               pthread_mutex_init( ctx->fd2ep_gate, NULL );
+                       }
+               }
        }
 }
 
 /*
-       Add an entry into the fd2ep hash which points to the given ep.
+       Add an entry into the fd2ep hash to map the FD to the endpoint.
 */
 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 }
 
 /*
-       Given a file descriptor fetches the related endpoint from the hash and 
-       deletes the entry from the hash (when we detect a disconnect).  
+       Given a file descriptor this fetches the related endpoint from the hash and
+       deletes the entry from the hash (when we detect a disconnect).
+
+       This will also set the state on the ep open to false, and revoke the
+       FD (nn_socket).
 */
 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
@@ -535,7 +562,11 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        if( ctx && ctx->fd2ep ) {
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
                if( ep ) {
+                       pthread_mutex_lock( ctx->fd2ep_gate );
+
                        rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+
+                       pthread_mutex_unlock( ctx->fd2ep_gate );
                }
        }
 
@@ -544,13 +575,17 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
 
 /*
        Given a file descriptor fetches the related endpoint from the hash.
-       Returns nil if there is no map.
+       Returns nil if there is no reference in the hash.
 */
 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
 
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 
        return ep;