if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( !warned ) {
- rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
warned++;
}
return SI_RET_OK;
}
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
+ if( ep ) {
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ }
+
+ return SI_RET_OK;
+}
+
/*
This is expected to execute in a separate thread. It is responsible for
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return