Updates to examples queries. Improve real-timeness of lfta aggregation
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
index 61872c4..5ec4167 100644 (file)
@@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl,
                ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
 //             ret+="\tint bitmap_size;\n";
                ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
+               ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n";
                ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
                ret += "\tint max_windows; // max number of open windows.\n";
                ret += "\tunsigned int generation; // initially zero, increment on\n";
@@ -2139,7 +2140,16 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co
                        sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
                      }
                 }
+               ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n";
+               ret += "\t}else{\n";
+               ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n";
+               ret += "\t\tt->n_ticks++;\n";
+               ret += "\t\tif(t->n_ticks == 2){\n";
+               ret += "\t\t\tif(t->flush_pos<t->max_aggrs) \n";
+               ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
+               ret += "\t\t}\n";
                ret += "\t}\n\n";
+               
 
        }
 
@@ -2299,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc
                  temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
                  temporal_flush+="\t\tt->generation++;\n";
                  temporal_flush+="\t\tt->flush_pos = 0;\n";
+                 temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n";
 
 
 //                             Now set the saved temporal value of the gb to the
@@ -4161,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
 
        if(is_aggr_query){
                ret += "\tf->n_aggrs = 0;\n";
+               ret += "\tf->n_ticks = 0; // for limiting slow flush\n";
 
                ret += "\tf->max_aggrs = ";