groupby_func func;
hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ gs_int32_t nflushes;
// Push out completed groups
- // create buffer on the stack to store key object
+// create buffer on the stack to store key object
char buffer[sizeof(group)];
+// Number of flushes required
- // extract the key information from the tuple and
- // copy it into buffer
+// extract the key information from the tuple and
+// copy it into buffer
group* grp = func.create_group(tup, buffer);
-/*// Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}
-*/
-
+ nflushes = func.flush_needed();
+
if (!grp) {
- if (func.flush_needed()){
+ if (nflushes>0){
flush(result);
}
if (func.temp_status_received()) {
return 0;
}
- if (func.flush_needed()) {
+ if (nflushes>0) {
flush(result);
}
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
virtual int flush(list<host_tuple>& result) {
host_tuple tup;
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-// If the old table isn't empty, flush it now.
- for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
- if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
- ++flush_pos;
- group_table.erase(g);
- delete (g);
- delete (a);
+
+// Limit the number of successive flushes - avoid explosive behavior
+ const gs_int32_t max_flushes = 10;
+ if(nflushes>max_flushes){
+ fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+ nflushes = max_flushes;
+ }
+
+ for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+// advance the TB for the reinit
+ if(flush_no < nflushes-1){
+ func.advance_last_tb();
}else{
- func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
- ++flush_pos;
+ func.reset_last_tb(); // Move to current tb in case flush limit reached
+ }
+// If the old table isn't empty, flush it now.
+ for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+ if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+ group* g = (*flush_pos).first;
+ aggregate* a = (*flush_pos).second;
+ ++flush_pos;
+ group_table.erase(g);
+ delete (g);
+ delete (a);
+ }else{
+ func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+ ++flush_pos;
+ }
}
}
child_qpn->table_name = new tablevar_t(
from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
child_qpn->table_name->set_range_var(from[f]->get_var_name());
+ child_qpn->table_name->set_machine(from[f]->get_machine());
child_vec.push_back(child_qpn);
select_vec.push_back(&(child_qpn->select_list));
return(ret);
}
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
+ string ret;
+
+ if(dt->complex_comparison(dt) ){
+ ret.append(dt->get_hfta_comparison_fcn(dt));
+ ret.append("(");
+ if(dt->is_buffer_type() )
+ ret.append("&");
+ ret.append(lhs_op);
+ ret.append(", ");
+ if(dt->is_buffer_type() )
+ ret.append("&");
+ ret.append(rhs_op );
+ ret.append(") == 1");
+ }else{
+ ret.append(lhs_op );
+ ret.append(" < ");
+ ret.append(rhs_op );
+ }
+
+ return(ret);
+}
+
static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
string ret;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g);
+ sprintf(tmpstr,"curr_gb%d",g);
ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
- sprintf(tmpstr,"last_flushed_gb%d",g);
+ sprintf(tmpstr,"last_gb%d",g);
ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
}
- ret += "\tbool needs_temporal_flush;\n";
+ ret += "\tgs_int32_t needs_temporal_flush;\n";
}
// The publicly exposed functions
// temporal flush variables
// ASSUME that structured values won't be temporal.
+ gs_int32_t temporal_gb = 0;
if(uses_temporal_flush){
ret += "//\t\tInitialize temporal flush variables.\n";
for(g=0;g<gb_tbl.size();g++){
literal_t gl(gdt->type_indicator());
sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
ret.append(tmpstr);
+ sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+ ret.append(tmpstr);
+ temporal_gb = g;
}
}
- ret += "\tneeds_temporal_flush = false;\n";
+ ret += "\tneeds_temporal_flush = 0;\n";
}
// Init temporal attributes referenced in select list
// set flush indicator and update stored GB vars if there is any change.
if(uses_temporal_flush){
- ret+= "\tif( !( (";
+ ret+= "\tif( ( (";
bool first_one = true;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
+ sprintf(tmpstr,"curr_gb%d",g); string lhs_op = tmpstr;
sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
if(first_one){first_one = false;} else {ret += ") && (";}
- ret += generate_equality_test(lhs_op, rhs_op, gdt);
+ ret += generate_lt_test(lhs_op, rhs_op, gdt);
}
}
ret += ") ) ){\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- if(gdt->is_buffer_type()){
+ temporal_gb = g;
+ if(gdt->is_buffer_type()){ // TODO first, last? or delete?
sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
}else{
- sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
- ret += tmpstr;
- sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+ ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+ ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+ ret += "\t\t}else{\n";
+ ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+ ret += "\t\t}\n";
+ sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
}
ret += tmpstr;
}
}
- ret += "\t\tneeds_temporal_flush=true;\n";
+ ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
ret += "\t\t}else{\n"
- "\t\t\tneeds_temporal_flush=false;\n"
+ "\t\t\tneeds_temporal_flush=0;\n"
"\t\t}\n";
}
//---------------------------------------------------
// Flush test
- ret += "\tbool flush_needed(){\n";
+ ret += "gs_int32_t flush_needed(){\n";
if(uses_temporal_flush){
- ret += "\t\treturn needs_temporal_flush;\n";
+ ret += "\treturn needs_temporal_flush;\n";
}else{
- ret += "\t\treturn false;\n";
+ ret += "\treturn 0;\n";
}
- ret += "\t};\n";
+ ret += "};\n";
+
+//------------------------------------------------
+// time bucket management
+ ret += "void advance_last_tb(){\n";
+ ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+ ret += "}\n\n";
+ ret += "void reset_last_tb(){\n";
+ ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+ ret += "}\n\n";
//---------------------------------------------------
// create output tuple
if(sdt->is_temporal()){
sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
ret += tmpstr;
- sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+ sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
ret += tmpstr;
ret += ";\n";
}