Fix session discconnect bug in interface to SI95 31/2431/1
authorE. Scott Daniels <daniels@research.att.com>
Thu, 6 Feb 2020 18:05:25 +0000 (13:05 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 6 Feb 2020 18:05:25 +0000 (13:05 -0500)
The disconnect of a session wasn't being properly recorded
and thus was preventing a reconnection attempt on the next
send attempt by the user application.

The warning about dropping messsages when the application cannot
keep up was also changed to be an error so that it is visible
at the default logging level.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ib07e4d0f9bf0d2b6436bb0b49be4ea37455509d3

src/rmr/common/include/rmr_symtab.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/symtab.c
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c
src/rmr/si/src/si95/sisendt.c
src/rmr/si/src/sr_si_static.c

index 74c5aa3..80940e6 100644 (file)
@@ -41,7 +41,7 @@ extern void rmr_sym_clear( void *s );
 extern void rmr_sym_dump( void *s );
 extern void *rmr_sym_alloc( int size );
 extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, uint64_t key );
+extern void rmr_sym_ndel( void *vtable, uint64_t key );
 extern void rmr_sym_free( void *vtable );
 extern void *rmr_sym_get( void *s,  const char *name, unsigned int class );
 extern int rmr_sym_put( void *s,  const char *name, unsigned int class, void *val );
index 12edecd..245c9b5 100644 (file)
@@ -1114,5 +1114,4 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        return key;
 }
 
-
 #endif
index 43acfa1..8b94a41 100644 (file)
@@ -313,7 +313,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 /*
        Delete element by numberic key.
 */
-extern void *rmr_sym_ndel(  void *vtable, uint64_t key ) {
+extern void rmr_sym_ndel(  void *vtable, uint64_t key ) {
        rmr_sym_del( vtable, (const char *) &key, 0 );
 }
 
index c816486..8c62082 100644 (file)
@@ -146,6 +146,7 @@ struct uta_ctx {
        river_t*        rivers;                 // inbound flows (index is the socket fd)
        int                     max_ibm;                // max size of an inbound message (river accum alloc size)
        void*           zcb_mring;              // zero copy buffer mbuf ring
+       void*           fd2ep;                  // the symtab mapping file des to endpoints for cleanup on disconnect
 };
 
 typedef uta_ctx_t uta_ctx;
@@ -168,8 +169,9 @@ static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try
 static inline int xlate_si_state( int state, int def_state );
 
 // --- these have changes for si
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp );
+//static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp );
 
 
 // --- msg ---------------------------------------
@@ -184,5 +186,10 @@ static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
 
+// ---- fd to endpoint translation ------------------------------
+static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd );
+static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd );
+static void fd2ep_init( uta_ctx_t* ctx );
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep );
 
 #endif
index 3736420..7a8b666 100644 (file)
@@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                if( !warned ) {
-                       rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
                        warned++;
                }
 
@@ -223,6 +223,31 @@ fprintf( stderr, "\n" );
        return SI_RET_OK;
 }
 
+/*
+       Callback driven on a disconnect notification. We will attempt to find the related 
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       remove it from the hash, and mark the endpoint as closed so that the next attempt
+       to send forces a reconnect attempt.
+
+       Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
+       if( ep ) {
+               ep->open = FALSE;
+               ep->nn_sock = -1;
+       }
+
+       return SI_RET_OK;
+}
+
 
 /*
        This is expected to execute in a separate thread. It is responsible for
@@ -253,7 +278,10 @@ static void* mt_receive( void* vctx ) {
        }
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
+
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
index 8697bc5..31da2cb 100644 (file)
@@ -286,7 +286,8 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 */
        if( (nn_sock = msg->rts_fd) < 0 ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -294,7 +295,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -548,7 +548,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -590,6 +590,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
        init_mtcall( ctx );                                                             // set up call chutes
+       fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
index f7360a9..a79291d 100644 (file)
@@ -54,7 +54,7 @@
 static void uta_ep_failed( endpoint_t* ep ) {
        if( ep != NULL ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
-               ep->open = 0;
+               ep->open = FALSE;
        }
 }
 
@@ -145,6 +145,9 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
        }
 
        uta_link2( ctx->si_ctx, ep );
+       if( ep->open ) {
+               fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
+       }
        return ep->open;
 }
 
@@ -217,12 +220,19 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t* ep;
-       int state = FALSE;
-
-       if( rt == NULL ) {
-               return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+       route_table_t*  rt; 
+       si_ctx_t*               si_ctx;
+       endpoint_t*             ep;
+       int                             state = FALSE;
+
+       if( PARINOID_CHECKS ) {
+               if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               rt = ctx->rtable;                               // faster but more risky
+               si_ctx = ctx->si_ctx;
        }
 
        ep =  rmr_sym_get( rt->hash, ep_name, 1 );
@@ -245,6 +255,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                       fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
                }
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
        } else {
@@ -281,13 +292,22 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t*     ep;                             // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+       si_ctx_t*               si_ctx;
+       endpoint_t*     ep;                             // selected end point
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
        int     idx;
 
+       if( PARINOID_CHECKS ) {
+               if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               si_ctx = ctx->si_ctx;
+       }
+
        //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
 
        if( ! more ) {                          // eliminate cheks each time we need to use
@@ -353,6 +373,7 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock,
                        if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                               fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
                        } else {
                                state = FALSE;
                        }
@@ -423,4 +444,58 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
        return rs;
 }
 
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+       Create the hash which maps file descriptors to endpoints. We need this
+       to easily mark an endpoint as disconnected when we are notified.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+       if( ctx  && ! ctx->fd2ep ) {
+               ctx->fd2ep = rmr_sym_alloc( 129 );              
+       }
+}
+
+/*
+       Add an entry into the fd2ep hash which points to the given ep.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+       if( ctx && ctx->fd2ep ) {
+               rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+       }
+}
+
+/*
+       Given a file descriptor fetches the related endpoint from the hash and 
+       deletes the entry from the hash (when we detect a disconnect).  
+*/
+static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+               if( ep ) {
+                       rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+               }
+       }
+
+       return ep;
+}
+
+/*
+       Given a file descriptor fetches the related endpoint from the hash.
+       Returns nil if there is no map.
+*/
+static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+       }
+
+       return ep;
+}
+
+
 #endif
index 20cf7b3..ae07edb 100644 (file)
@@ -100,6 +100,8 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
                        errno = EBUSY;
                        status = SI_ERR_BLOCKED;
                }
+       } else {
+               errno = EBADFD;                 // fd in a bad state (probably losed)
        }
 
        return status;
index c067948..ba22bc9 100644 (file)
@@ -754,7 +754,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
+               sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
 
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );