From: vlad shkapenyuk Date: Tue, 12 May 2020 13:42:58 +0000 (-0400) Subject: Update running groupby operator X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=7cec316889150a8a92238e52c7bad1270608b333;p=com%2Fgs-lite.git Update running groupby operator Signed-off-by: vlad shkapenyuk Change-Id: I4e0edbf2914a2b92edb3ade592a3fc1f774fd2b3 --- diff --git a/include/hfta/join_eq_hash_operator.h b/include/hfta/join_eq_hash_operator.h index 5038f7a..911074e 100644 --- a/include/hfta/join_eq_hash_operator.h +++ b/include/hfta/join_eq_hash_operator.h @@ -277,7 +277,7 @@ n_calls=0; n_iters=0; n_eqk=0; int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]); if (tup_order < 0){ -printf("out of order ts.\n"); +printf("%s: out of order ts.\n", op_name); tup.free_tuple(); // even for out of order temporal tuples we need to post new temporal tuple diff --git a/include/hfta/running_gb_operator.h b/include/hfta/running_gb_operator.h index 5bd7388..c20c87f 100644 --- a/include/hfta/running_gb_operator.h +++ b/include/hfta/running_gb_operator.h @@ -31,6 +31,7 @@ private : groupby_func func; hash_table group_table; typename hash_table::iterator flush_pos; + gs_int32_t nflushes; @@ -43,21 +44,17 @@ public: // Push out completed groups - // create buffer on the stack to store key object +// create buffer on the stack to store key object char buffer[sizeof(group)]; +// Number of flushes required - // extract the key information from the tuple and - // copy it into buffer +// extract the key information from the tuple and +// copy it into buffer group* grp = func.create_group(tup, buffer); -/*// Ignore temp tuples until we can fix their timestamps. -if (func.temp_status_received()) { - tup.free_tuple(); - return 0; -} -*/ - + nflushes = func.flush_needed(); + if (!grp) { - if (func.flush_needed()){ + if (nflushes>0){ flush(result); } if (func.temp_status_received()) { @@ -71,7 +68,7 @@ if (func.temp_status_received()) { return 0; } - if (func.flush_needed()) { + if (nflushes>0) { flush(result); } typename hash_table::iterator iter; @@ -95,24 +92,40 @@ if (func.temp_status_received()) { virtual int flush(list& result) { host_tuple tup; typename hash_table::iterator iter; -// If the old table isn't empty, flush it now. - for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - tup.channel = output_channel; - result.push_back(tup); - } - if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ - group* g = (*flush_pos).first; - aggregate* a = (*flush_pos).second; - ++flush_pos; - group_table.erase(g); - delete (g); - delete (a); + +// Limit the number of successive flushes - avoid explosive behavior + const gs_int32_t max_flushes = 10; + if(nflushes>max_flushes){ + fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes); + nflushes = max_flushes; + } + + for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){ +// advance the TB for the reinit + if(flush_no < nflushes-1){ + func.advance_last_tb(); }else{ - func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); - ++flush_pos; + func.reset_last_tb(); // Move to current tb in case flush limit reached + } +// If the old table isn't empty, flush it now. + for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + tup.channel = output_channel; + result.push_back(tup); + } + if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ + group* g = (*flush_pos).first; + aggregate* a = (*flush_pos).second; + ++flush_pos; + group_table.erase(g); + delete (g); + delete (a); + }else{ + func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); + ++flush_pos; + } } } diff --git a/include/lfta/rts_external.h b/include/lfta/rts_external.h index 358527e..d1fdf7b 100644 --- a/include/lfta/rts_external.h +++ b/include/lfta/rts_external.h @@ -69,6 +69,9 @@ gs_retval_t str_exists_substr( struct gs_string * str1, struct gs_string * str2) gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2); +/* String equality */ + +gs_retval_t str_equal( struct gs_string * str1, struct gs_string * str2); /* Construct a string constant */ diff --git a/src/ftacmp/query_plan.cc b/src/ftacmp/query_plan.cc index 512299f..2e2286e 100644 --- a/src/ftacmp/query_plan.cc +++ b/src/ftacmp/query_plan.cc @@ -4914,6 +4914,7 @@ vector join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t child_qpn->table_name = new tablevar_t( from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq()); child_qpn->table_name->set_range_var(from[f]->get_var_name()); + child_qpn->table_name->set_machine(from[f]->get_machine()); child_vec.push_back(child_qpn); select_vec.push_back(&(child_qpn->select_list)); @@ -7032,6 +7033,29 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * return(ret); } +static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){ + string ret; + + if(dt->complex_comparison(dt) ){ + ret.append(dt->get_hfta_comparison_fcn(dt)); + ret.append("("); + if(dt->is_buffer_type() ) + ret.append("&"); + ret.append(lhs_op); + ret.append(", "); + if(dt->is_buffer_type() ) + ret.append("&"); + ret.append(rhs_op ); + ret.append(") == 1"); + }else{ + ret.append(lhs_op ); + ret.append(" < "); + ret.append(rhs_op ); + } + + return(ret); +} + static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ string ret; @@ -12863,13 +12887,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); + sprintf(tmpstr,"curr_gb%d",g); ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; - sprintf(tmpstr,"last_flushed_gb%d",g); + sprintf(tmpstr,"last_gb%d",g); ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } } - ret += "\tbool needs_temporal_flush;\n"; + ret += "\tgs_int32_t needs_temporal_flush;\n"; } // The publicly exposed functions @@ -12907,6 +12931,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // temporal flush variables // ASSUME that structured values won't be temporal. + gs_int32_t temporal_gb = 0; if(uses_temporal_flush){ ret += "//\t\tInitialize temporal flush variables.\n"; for(g=0;gtype_indicator()); sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); ret.append(tmpstr); + sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); + ret.append(tmpstr); + temporal_gb = g; } } - ret += "\tneeds_temporal_flush = false;\n"; + ret += "\tneeds_temporal_flush = 0;\n"; } // Init temporal attributes referenced in select list @@ -13031,35 +13059,39 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // set flush indicator and update stored GB vars if there is any change. if(uses_temporal_flush){ - ret+= "\tif( !( ("; + ret+= "\tif( ( ("; bool first_one = true; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; + sprintf(tmpstr,"curr_gb%d",g); string lhs_op = tmpstr; sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; if(first_one){first_one = false;} else {ret += ") && (";} - ret += generate_equality_test(lhs_op, rhs_op, gdt); + ret += generate_lt_test(lhs_op, rhs_op, gdt); } } ret += ") ) ){\n"; for(g=0;gis_temporal()){ - if(gdt->is_buffer_type()){ + temporal_gb = g; + if(gdt->is_buffer_type()){ // TODO first, last? or delete? sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); }else{ - sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g); - ret += tmpstr; - sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g); + ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n"; + ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n"; + ret += "\t\t}else{\n"; + ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n"; + ret += "\t\t}\n"; + sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g); } ret += tmpstr; } } - ret += "\t\tneeds_temporal_flush=true;\n"; + ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; ret += "\t\t}else{\n" - "\t\t\tneeds_temporal_flush=false;\n" + "\t\t\tneeds_temporal_flush=0;\n" "\t\t}\n"; } @@ -13230,13 +13262,22 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; //--------------------------------------------------- // Flush test - ret += "\tbool flush_needed(){\n"; + ret += "gs_int32_t flush_needed(){\n"; if(uses_temporal_flush){ - ret += "\t\treturn needs_temporal_flush;\n"; + ret += "\treturn needs_temporal_flush;\n"; }else{ - ret += "\t\treturn false;\n"; + ret += "\treturn 0;\n"; } - ret += "\t};\n"; + ret += "};\n"; + +//------------------------------------------------ +// time bucket management + ret += "void advance_last_tb(){\n"; + ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n"; + ret += "}\n\n"; + ret += "void reset_last_tb(){\n"; + ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n"; + ret += "}\n\n"; //--------------------------------------------------- // create output tuple @@ -13469,7 +13510,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(sdt->is_temporal()){ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; - sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str()); + sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str()); ret += tmpstr; ret += ";\n"; } diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index b6855ff..31c552f 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -1907,7 +1907,11 @@ for(q=0;q0){ liface = tvec[0]->get_interface(); // iface queries have been resolved - lmach = tvec[0]->get_machine(); + if(tvec[0]->get_machine() != ""){ + lmach = tvec[0]->get_machine(); + }else{ + fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str()); + } } // else{ interface_names.push_back(liface); machine_names.push_back(lmach); diff --git a/src/lib/gscplftaaux/rts_string.c b/src/lib/gscplftaaux/rts_string.c index 8c31657..56c652a 100644 --- a/src/lib/gscplftaaux/rts_string.c +++ b/src/lib/gscplftaaux/rts_string.c @@ -110,15 +110,11 @@ gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val, struct gs_st gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2) { gs_int32_t len; - gs_int32_t x; + gs_int32_t x, ret; len = (str1->length>str2->length)?str2->length:str1->length; for(x=0;xdata[x]>str2->data[x]) { - return 1; - } - if (str1->data[x]data[x]) { - return -1; - } + if (ret = (str1->data[x]-str2->data[x])) + return ret; } if (str1->length>str2->length) { @@ -130,6 +126,22 @@ gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2) return 0; } +gs_retval_t str_equal( struct gs_string * str1, struct gs_string * str2) +{ + gs_int32_t x; + + if (str1->length != str2->length) + return -1; + + for(x=0;xlength;x++) { + if (str1->data[x]!=str2->data[x]) { + return -1; + } + } + + return 0; +} + gs_retval_t str_constructor(struct gs_string *s, gs_sp_t l){ s->data = l; s->length = 0;