+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
+}
+
+// ------------ route manager communication -------------------------------------------------
+/*
+ Send a request for a table update to the route manager. Updates come in
+ async, so send and go.
+
+ pctx is the private context for the thread; ctx is the application context
+ that we need to be able to send the application ID in case rt mgr needs to
+ use it to idenfity us.
+
+ Returns 0 if we were not able to send a request.
+*/
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
+ rmr_mbuf_t* smsg;
+ int state = 0;
+
+ if( ctx->rtg_whid < 0 ) {
+ return state;
+ }
+
+ smsg = rmr_alloc_msg( pctx, 1024 );
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_REQ_TABLE;
+ smsg->sub_id = 0;
+ snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ rmr_free_msg( smsg );
+ }
+
+ return state;
+}
+
+/*
+ Send an ack to the route table manager for a table ID that we are
+ processing. State is 1 for OK, and 0 for failed. Reason might
+ be populated if we know why there was a failure.
+
+ Context should be the PRIVATE context that we use for messages
+ to route manger and NOT the user's context.
+
+ If a message buffere is passed we use that and use return to sender
+ assuming that this might be a response to a call and that is needed
+ to send back to the proper calling thread. If msg is nil, we allocate
+ and use it.
+*/
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+ int use_rts = 1;
+ int payload_size = 1024;
+
+ if( ctx == NULL || ctx->rtg_whid < 0 ) {
+ return;
+ }
+
+ if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
+ return;
+ }
+
+ if( smsg != NULL ) {
+ smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
+ } else {
+ use_rts = 0;
+ smsg = rmr_alloc_msg( ctx, payload_size );
+ }
+
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_TABLE_STATE;
+ smsg->sub_id = -1;
+ snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+ table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
+
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+ if( use_rts ) {
+ smsg = rmr_rts_msg( ctx, smsg );
+ } else {
+ smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+ }
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ if( ! use_rts ) {
+ rmr_free_msg( smsg ); // if not our message we must free the leftovers
+ }
+ }
+}
+
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+ Given the user's context (not the thread private context) look to see if the application isn't
+ working fast enough and we're dropping messages. If the drop counter has changed since the last
+ peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+ set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+ The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+ static int alarm_raised = 0;
+ static int ok2clear = 0; // time that we can clear
+ static int lastd = 0; // the last counter value so we can compute delta
+ static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
+
+ rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+ if( ! alarm_raised ) {
+ if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
+ return;
+ }
+
+ alarm_raised = 1;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+ rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+ } else {
+ if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
+ lastd = uctx->dcount;
+ ok2clear = 0; // reset the timer
+ return;
+ }
+
+ if( ok2clear == 0 ) { // first round where not dropping
+ ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
+ } else {
+ if( time( NULL ) > ok2clear ) { // things still stable after expiry
+ rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+ alarm_raised = 0;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+ prob_id++;
+ }
+ }
+ }
+}
+
+// ---- utility -----------------------------------------------------------------------------------
+
+int isspace_with_fence(int c) {
+ _mm_lfence();
+ return isspace( c );
+}
+