extern void rmr_sym_dump( void *s );
extern void *rmr_sym_alloc( int size );
extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, uint64_t key );
+extern void rmr_sym_ndel( void *vtable, uint64_t key );
extern void rmr_sym_free( void *vtable );
extern void *rmr_sym_get( void *s, const char *name, unsigned int class );
extern int rmr_sym_put( void *s, const char *name, unsigned int class, void *val );
/*
Delete element by numberic key.
*/
-extern void *rmr_sym_ndel( void *vtable, uint64_t key ) {
+extern void rmr_sym_ndel( void *vtable, uint64_t key ) {
rmr_sym_del( vtable, (const char *) &key, 0 );
}
river_t* rivers; // inbound flows (index is the socket fd)
int max_ibm; // max size of an inbound message (river accum alloc size)
void* zcb_mring; // zero copy buffer mbuf ring
+ void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect
};
typedef uta_ctx_t uta_ctx;
static inline int xlate_si_state( int state, int def_state );
// --- these have changes for si
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp );
+//static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp );
// --- msg ---------------------------------------
static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
+// ---- fd to endpoint translation ------------------------------
+static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd );
+static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd );
+static void fd2ep_init( uta_ctx_t* ctx );
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep );
#endif
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( !warned ) {
- rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
warned++;
}
return SI_RET_OK;
}
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
+ if( ep ) {
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ }
+
+ return SI_RET_OK;
+}
+
/*
This is expected to execute in a separate thread. It is responsible for
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return
*/
if( (nn_sock = msg->rts_fd) < 0 ) {
if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+ //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
}
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
}
}
-
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
}
init_mtcall( ctx ); // set up call chutes
+ fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
static void uta_ep_failed( endpoint_t* ep ) {
if( ep != NULL ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name );
- ep->open = 0;
+ ep->open = FALSE;
}
}
}
uta_link2( ctx->si_ctx, ep );
+ if( ep->open ) {
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
+ }
return ep->open;
}
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep;
- int state = FALSE;
-
- if( rt == NULL ) {
- return FALSE;
+static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
+ route_table_t* rt;
+ si_ctx_t* si_ctx;
+ endpoint_t* ep;
+ int state = FALSE;
+
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ rt = ctx->rtable; // faster but more risky
+ si_ctx = ctx->si_ctx;
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
} else {
Both of these, in the grand scheme of things, is minor compared to the
overhead of grabbing a lock on each call.
*/
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
- endpoint_t* ep; // seected end point
+static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) {
+ si_ctx_t* si_ctx;
+ endpoint_t* ep; // selected end point
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
int idx;
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ si_ctx = ctx->si_ctx;
+ }
+
//fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
if( ! more ) { // eliminate cheks each time we need to use
if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
+ fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup
} else {
state = FALSE;
}
return rs;
}
+
+// ---- fd to ep functions --------------------------------------------------------------------------
+
+/*
+ Create the hash which maps file descriptors to endpoints. We need this
+ to easily mark an endpoint as disconnected when we are notified.
+*/
+static void fd2ep_init( uta_ctx_t* ctx ) {
+ if( ctx && ! ctx->fd2ep ) {
+ ctx->fd2ep = rmr_sym_alloc( 129 );
+ }
+}
+
+/*
+ Add an entry into the fd2ep hash which points to the given ep.
+*/
+static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
+ if( ctx && ctx->fd2ep ) {
+ rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+ }
+}
+
+/*
+ Given a file descriptor fetches the related endpoint from the hash and
+ deletes the entry from the hash (when we detect a disconnect).
+*/
+static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+ if( ep ) {
+ rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd );
+ }
+ }
+
+ return ep;
+}
+
+/*
+ Given a file descriptor fetches the related endpoint from the hash.
+ Returns nil if there is no map.
+*/
+static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) {
+ endpoint_t* ep = NULL;
+
+ if( ctx && ctx->fd2ep ) {
+ ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd );
+ }
+
+ return ep;
+}
+
+
#endif
errno = EBUSY;
status = SI_ERR_BLOCKED;
}
+ } else {
+ errno = EBADFD; // fd in a bad state (probably losed)
}
return status;
send_again = 1; // force loop entry
group = 0; // always start with group 0
while( send_again ) {
- sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
+ sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );