From: E. Scott Daniels Date: Thu, 6 Feb 2020 18:05:25 +0000 (-0500) Subject: Fix session discconnect bug in interface to SI95 X-Git-Tag: 3.1.3~2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c1658934f329e02a704dd5ec94b38dff293b09ee;p=ric-plt%2Flib%2Frmr.git Fix session discconnect bug in interface to SI95 The disconnect of a session wasn't being properly recorded and thus was preventing a reconnection attempt on the next send attempt by the user application. The warning about dropping messsages when the application cannot keep up was also changed to be an error so that it is visible at the default logging level. Signed-off-by: E. Scott Daniels Change-Id: Ib07e4d0f9bf0d2b6436bb0b49be4ea37455509d3 --- diff --git a/src/rmr/common/include/rmr_symtab.h b/src/rmr/common/include/rmr_symtab.h index 74c5aa3..80940e6 100644 --- a/src/rmr/common/include/rmr_symtab.h +++ b/src/rmr/common/include/rmr_symtab.h @@ -41,7 +41,7 @@ extern void rmr_sym_clear( void *s ); extern void rmr_sym_dump( void *s ); extern void *rmr_sym_alloc( int size ); extern void rmr_sym_del( void *s, const char *name, unsigned int class ); -extern void *rmr_sym_ndel( void *vtable, uint64_t key ); +extern void rmr_sym_ndel( void *vtable, uint64_t key ); extern void rmr_sym_free( void *vtable ); extern void *rmr_sym_get( void *s, const char *name, unsigned int class ); extern int rmr_sym_put( void *s, const char *name, unsigned int class, void *val ); diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 12edecd..245c9b5 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -1114,5 +1114,4 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { return key; } - #endif diff --git a/src/rmr/common/src/symtab.c b/src/rmr/common/src/symtab.c index 43acfa1..8b94a41 100644 --- a/src/rmr/common/src/symtab.c +++ b/src/rmr/common/src/symtab.c @@ -313,7 +313,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class ) /* Delete element by numberic key. */ -extern void *rmr_sym_ndel( void *vtable, uint64_t key ) { +extern void rmr_sym_ndel( void *vtable, uint64_t key ) { rmr_sym_del( vtable, (const char *) &key, 0 ); } diff --git a/src/rmr/si/include/rmr_si_private.h b/src/rmr/si/include/rmr_si_private.h index c816486..8c62082 100644 --- a/src/rmr/si/include/rmr_si_private.h +++ b/src/rmr/si/include/rmr_si_private.h @@ -146,6 +146,7 @@ struct uta_ctx { river_t* rivers; // inbound flows (index is the socket fd) int max_ibm; // max size of an inbound message (river accum alloc size) void* zcb_mring; // zero copy buffer mbuf ring + void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect }; typedef uta_ctx_t uta_ctx; @@ -168,8 +169,9 @@ static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try static inline int xlate_si_state( int state, int def_state ); // --- these have changes for si -static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ); -static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ); +static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ); +//static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ); +static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp ); // --- msg --------------------------------------- @@ -184,5 +186,10 @@ static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ); static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ); +// ---- fd to endpoint translation ------------------------------ +static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ); +static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ); +static void fd2ep_init( uta_ctx_t* ctx ); +static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ); #endif diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 3736420..7a8b666 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full if( !warned ) { - rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); warned++; } @@ -223,6 +223,31 @@ fprintf( stderr, "\n" ); return SI_RET_OK; } +/* + Callback driven on a disconnect notification. We will attempt to find the related + endpoint via the fd2ep hash maintained in the context. If we find it, then we + remove it from the hash, and mark the endpoint as closed so that the next attempt + to send forces a reconnect attempt. + + Future: put the ep on a queue to automatically attempt to reconnect. +*/ +static int mt_disc_cb( void* vctx, int fd ) { + uta_ctx_t* ctx; + endpoint_t* ep; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return SI_RET_OK; + } + + ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash + if( ep ) { + ep->open = FALSE; + ep->nn_sock = -1; + } + + return SI_RET_OK; +} + /* This is expected to execute in a separate thread. It is responsible for @@ -253,7 +278,10 @@ static void* mt_receive( void* vctx ) { } if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" ); + SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data + SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects + SIwait( ctx->si_ctx ); return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 8697bc5..31da2cb 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -286,7 +286,8 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { */ if( (nn_sock = msg->rts_fd) < 0 ) { if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx ); + //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx ); + sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; @@ -294,7 +295,6 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } } - msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip @@ -548,7 +548,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -590,6 +590,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); } init_mtcall( ctx ); // set up call chutes + fd2ep_init( ctx ); // initialise the fd to endpoint sym tab ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c index f7360a9..a79291d 100644 --- a/src/rmr/si/src/rtable_si_static.c +++ b/src/rmr/si/src/rtable_si_static.c @@ -54,7 +54,7 @@ static void uta_ep_failed( endpoint_t* ep ) { if( ep != NULL ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name ); - ep->open = 0; + ep->open = FALSE; } } @@ -145,6 +145,9 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) { } uta_link2( ctx->si_ctx, ep ); + if( ep->open ) { + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup + } return ep->open; } @@ -217,12 +220,19 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n the user pointer passed in and sets the return value to true (1). If the endpoint cannot be found false (0) is returned. */ -static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) { - endpoint_t* ep; - int state = FALSE; - - if( rt == NULL ) { - return FALSE; +static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) { + route_table_t* rt; + si_ctx_t* si_ctx; + endpoint_t* ep; + int state = FALSE; + + if( PARINOID_CHECKS ) { + if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + return FALSE; + } + } else { + rt = ctx->rtable; // faster but more risky + si_ctx = ctx->si_ctx; } ep = rmr_sym_get( rt->hash, ep_name, 1 ); @@ -245,6 +255,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup } if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name ); } else { @@ -281,13 +292,22 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en Both of these, in the grand scheme of things, is minor compared to the overhead of grabbing a lock on each call. */ -static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) { - endpoint_t* ep; // seected end point +static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) { + si_ctx_t* si_ctx; + endpoint_t* ep; // selected end point int state = FALSE; // processing state int dummy; rrgroup_t* rrg; int idx; + if( PARINOID_CHECKS ) { + if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + return FALSE; + } + } else { + si_ctx = ctx->si_ctx; + } + //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups ); if( ! more ) { // eliminate cheks each time we need to use @@ -353,6 +373,7 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup } else { state = FALSE; } @@ -423,4 +444,58 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) { return rs; } + +// ---- fd to ep functions -------------------------------------------------------------------------- + +/* + Create the hash which maps file descriptors to endpoints. We need this + to easily mark an endpoint as disconnected when we are notified. +*/ +static void fd2ep_init( uta_ctx_t* ctx ) { + if( ctx && ! ctx->fd2ep ) { + ctx->fd2ep = rmr_sym_alloc( 129 ); + } +} + +/* + Add an entry into the fd2ep hash which points to the given ep. +*/ +static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) { + if( ctx && ctx->fd2ep ) { + rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep ); + } +} + +/* + Given a file descriptor fetches the related endpoint from the hash and + deletes the entry from the hash (when we detect a disconnect). +*/ +static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) { + endpoint_t* ep = NULL; + + if( ctx && ctx->fd2ep ) { + ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); + if( ep ) { + rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd ); + } + } + + return ep; +} + +/* + Given a file descriptor fetches the related endpoint from the hash. + Returns nil if there is no map. +*/ +static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) { + endpoint_t* ep = NULL; + + if( ctx && ctx->fd2ep ) { + ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); + } + + return ep; +} + + #endif diff --git a/src/rmr/si/src/si95/sisendt.c b/src/rmr/si/src/si95/sisendt.c index 20cf7b3..ae07edb 100644 --- a/src/rmr/si/src/si95/sisendt.c +++ b/src/rmr/si/src/si95/sisendt.c @@ -100,6 +100,8 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) { errno = EBUSY; status = SI_ERR_BLOCKED; } + } else { + errno = EBADFD; // fd in a bad state (probably losed) } return status; diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index c067948..ba22bc9 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -754,7 +754,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 while( send_again ) { - sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups + sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n", msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );