Merge "Cmake, change files missing from previous commit"
authorMatti Hiltunen <hiltunen@att.com>
Thu, 6 Feb 2020 19:16:34 +0000 (19:16 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Thu, 6 Feb 2020 19:16:34 +0000 (19:16 +0000)
src/rmr/common/include/rmr_symtab.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/symtab.c
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c
src/rmr/si/src/si95/sisendt.c
src/rmr/si/src/sr_si_static.c

index 74c5aa3..80940e6 100644 (file)
@@ -41,7 +41,7 @@ extern void rmr_sym_clear( void *s );
 extern void rmr_sym_dump( void *s );
 extern void *rmr_sym_alloc( int size );
 extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, uint64_t key );
+extern void rmr_sym_ndel( void *vtable, uint64_t key );
 extern void rmr_sym_free( void *vtable );
 extern void *rmr_sym_get( void *s,  const char *name, unsigned int class );
 extern int rmr_sym_put( void *s,  const char *name, unsigned int class, void *val );
index 12edecd..245c9b5 100644 (file)
@@ -1114,5 +1114,4 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        return key;
 }
 
-
 #endif
index 43acfa1..8b94a41 100644 (file)
@@ -313,7 +313,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 /*
        Delete element by numberic key.
 */
-extern void *rmr_sym_ndel(  void *vtable, uint64_t key ) {
+extern void rmr_sym_ndel(  void *vtable, uint64_t key ) {
        rmr_sym_del( vtable, (const char *) &key, 0 );
 }
 
index c816486..8c62082 100644 (file)
@@ -146,6 +146,7 @@ struct uta_ctx {
        river_t*        rivers;                 // inbound flows (index is the socket fd)
        int                     max_ibm;                // max size of an inbound message (river accum alloc size)
        void*           zcb_mring;              // zero copy buffer mbuf ring
+       void*           fd2ep;                  // the symtab mapping file des to endpoints for cleanup on disconnect
 };
 
 typedef uta_ctx_t uta_ctx;
@@ -168,8 +169,9 @@ static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try
 static inline int xlate_si_state( int state, int def_state );
 
 // --- these have changes for si
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp );
+//static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp );
 
 
 // --- msg ---------------------------------------
@@ -184,5 +186,10 @@ static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
 
+// ---- fd to endpoint translation ------------------------------
+static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd );
+static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd );
+static void fd2ep_init( uta_ctx_t* ctx );
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep );
 
 #endif
index 3736420..7a8b666 100644 (file)
@@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                if( !warned ) {
-                       rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
                        warned++;
                }
 
@@ -223,6 +223,31 @@ fprintf( stderr, "\n" );
        return SI_RET_OK;
 }
 
+/*
+       Callback driven on a disconnect notification. We will attempt to find the related 
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       remove it from the hash, and mark the endpoint as closed so that the next attempt
+       to send forces a reconnect attempt.
+
+       Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
+       if( ep ) {
+               ep->open = FALSE;
+               ep->nn_sock = -1;
+       }
+
+       return SI_RET_OK;
+}
+
 
 /*
        This is expected to execute in a separate thread. It is responsible for
@@ -253,7 +278,10 @@ static void* mt_receive( void* vctx ) {
        }
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
+
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
index 8697bc5..31da2cb 100644 (file)
@@ -286,7 +286,8 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 */
        if( (nn_sock = msg->rts_fd) < 0 ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -294,7 +295,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -548,7 +548,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -590,6 +590,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
        init_mtcall( ctx );                                                             // set up call chutes
+       fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
index f7360a9..a79291d 100644 (file)
@@ -54,7 +54,7 @@
 static void uta_ep_failed( endpoint_t* ep ) {
        if( ep != NULL ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
-               ep->open = 0;
+               ep->open = FALSE;
        }
 }
 
@@ -145,6 +145,9 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
        }
 
        uta_link2( ctx->si_ctx, ep );
+       if( ep->open ) {
+               fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
+       }
        return ep->open;
 }
 
@@ -217,12 +220,19 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t* ep;
-       int state = FALSE;
-
-       if( rt == NULL ) {
-               return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+       route_table_t*  rt; 
+       si_ctx_t*               si_ctx;
+       endpoint_t*             ep;
+       int                             state = FALSE;
+
+       if( PARINOID_CHECKS ) {
+               if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               rt = ctx->rtable;                               // faster but more risky
+               si_ctx = ctx->si_ctx;
        }
 
        ep =  rmr_sym_get( rt->hash, ep_name, 1 );
@@ -245,6 +255,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                       fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to this ep for disc cleanup
                }
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
        } else {
@@ -281,13 +292,22 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
-       endpoint_t*     ep;                             // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+       si_ctx_t*               si_ctx;
+       endpoint_t*     ep;                             // selected end point
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
        int     idx;
 
+       if( PARINOID_CHECKS ) {
+               if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               si_ctx = ctx->si_ctx;
+       }
+
        //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
 
        if( ! more ) {                          // eliminate cheks each time we need to use
@@ -353,6 +373,7 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock,
                        if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                               fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
                        } else {
                                state = FALSE;
                        }
@@ -423,4 +444,58 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
        return rs;
 }
 
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+       Create the hash which maps file descriptors to endpoints. We need this
+       to easily mark an endpoint as disconnected when we are notified.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+       if( ctx  && ! ctx->fd2ep ) {
+               ctx->fd2ep = rmr_sym_alloc( 129 );              
+       }
+}
+
+/*
+       Add an entry into the fd2ep hash which points to the given ep.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+       if( ctx && ctx->fd2ep ) {
+               rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+       }
+}
+
+/*
+       Given a file descriptor fetches the related endpoint from the hash and 
+       deletes the entry from the hash (when we detect a disconnect).  
+*/
+static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+               if( ep ) {
+                       rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+               }
+       }
+
+       return ep;
+}
+
+/*
+       Given a file descriptor fetches the related endpoint from the hash.
+       Returns nil if there is no map.
+*/
+static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
+       endpoint_t* ep = NULL;
+
+       if( ctx && ctx->fd2ep ) {
+               ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+       }
+
+       return ep;
+}
+
+
 #endif
index 20cf7b3..ae07edb 100644 (file)
@@ -100,6 +100,8 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
                        errno = EBUSY;
                        status = SI_ERR_BLOCKED;
                }
+       } else {
+               errno = EBADFD;                 // fd in a bad state (probably losed)
        }
 
        return status;
index c067948..ba22bc9 100644 (file)
@@ -754,7 +754,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
+               sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
 
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );