From 804ea15b01566ac0de58781ca61870b4824d0e02 Mon Sep 17 00:00:00 2001 From: vlad shkapenyuk Date: Thu, 13 Feb 2020 18:33:47 -0500 Subject: [PATCH] Fixes to filter join and gcc 7.x compilation errors Signed-off-by: vlad shkapenyuk Change-Id: I78bbf93d9550f5ed8a31a0a1cee3a6fb3a133059 --- src/ftacmp/analyze_fta.cc | 8 +- src/ftacmp/generate_lfta_code.cc | 35 ++++---- src/ftacmp/query_plan.cc | 2 +- src/ftacmp/query_plan.h | 6 +- src/ftacmp/stream_query.cc | 8 ++ src/ftacmp/translate_fta.cc | 167 +++++++++++++++++++++++---------------- src/lib/gscphftaaux/flip_udaf.cc | 8 +- 7 files changed, 145 insertions(+), 89 deletions(-) diff --git a/src/ftacmp/analyze_fta.cc b/src/ftacmp/analyze_fta.cc index 8c9fd3b..430dd47 100644 --- a/src/ftacmp/analyze_fta.cc +++ b/src/ftacmp/analyze_fta.cc @@ -4130,7 +4130,7 @@ query_summary_class *analyze_fta(table_exp_t *fta_tree, table_list *schema, tbl_vec[tbl_vec.size()-1]->set_property(1); if(jprop == FILTER_JOIN_PROPERTY){ if(fta_tree->get_from()->get_temporal_range() == 0){ - fprintf(stderr,"ERROR, a filter join must have a non-zero tempoal range.\n"); + fprintf(stderr,"ERROR, a filter join must have a non-zero temporal range.\n"); return NULL; } if(tbl_vec.size() != 2){ @@ -4150,8 +4150,10 @@ query_summary_class *analyze_fta(table_exp_t *fta_tree, table_list *schema, string type_name = schema->get_type_name(tbl_vec[0]->get_schema_ref(),field); param_list *modifiers = schema->get_modifier_list(cr->get_schema_ref(), field); data_type *dt0 = new data_type(type_name, modifiers); - if(dt0->get_type_str() != "UINT"){ - fprintf(stderr,"ERROR, the temporal attribute in a filter join must be a UINT.\n"); + string dt0_type = dt0->get_type_str(); + if(dt0_type != "INT" && dt0_type != "UINT" && dt0_type != "LLONG" && dt0_type != "ULLONG"){ +// if(dt0->get_type_str() != "UINT"){ + fprintf(stderr,"ERROR, the temporal attribute in a filter join must be one of INT/UINT/LLONG/ULLONG.\n"); return NULL; } if(! dt0->is_increasing()){ diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 98d872d..aea03a8 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -237,7 +237,7 @@ string generate_fj_struct(filter_join_qpn *fs, string node_name ){ int k; for(k=0;khash_eq.size();++k){ sprintf(tmpstr,"key_var%d",k); - ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n"; + ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n"; } ret += "\tlong long int ts;\n"; ret += "};\n\n"; @@ -2360,9 +2360,13 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% // bloom filter needs to be advanced. // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index) // t->bf_size : number of bits in bloom filter +// TODO: vectorize? +// TODO: Don't iterate more than n_bloom times! +// As written, its possible to wrap around many times. if(fs->use_bloom){ ret += "// Clean out old bloom filters if needed.\n" +"// TODO vectorize this ? \n" " if(t->first_exec){\n" " t->first_exec = 0;\n" " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n" @@ -2493,6 +2497,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ; } }else{ + ret += "// Add the S record to the hash table, choose a position\n"; ret += "\t\tbucket=0;\n"; for(p=0;phash_eq.size();++p){ ret += @@ -2516,20 +2521,20 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += generate_equality_test(lhs_op,rhs_op,hdt); } ret += "){\n\t\t\tthe_bucket = bucket;\n"; - ret += "\t\t}else {if("; + ret += "\t\t}else{\n\t\t\tif("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == s_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); - string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); + string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } - ret += "){\n\t\t\tthe_bucket = bucket1;\n"; - ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n"; - ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n"; - ret += "\t\t}}\n"; + ret += "){\n\t\t\t\tthe_bucket = bucket1;\n"; + ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n"; + ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n"; + ret += "\t\t\t}\n\t\t}\n"; for(p=0;phash_eq.size();++p){ data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); if(hdt->is_buffer_type()){ @@ -2569,7 +2574,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% } } -// Sort S preds based on cost. +// Sort R preds based on cost. vector tmp_wh; for(w=0;whash_eq.size();++p) - ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n"; + ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n"; // Passed the cheap pred, now test the join with S. @@ -2637,7 +2642,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += " bucket"+int_to_string(i)+ " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ - fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ + fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += @@ -2662,7 +2667,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% for(p=0;phash_eq.size();++p){ ret += " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ - fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ + fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += @@ -2689,7 +2694,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == r_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); - string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); + string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } @@ -3170,7 +3175,9 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex bool uses_bloom = fjq->use_bloom; ret += "/*\t\tJoin fields\t*/\n"; for(g=0;ghash_eq.size();g++){ - sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g); + sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g); + ret += tmpstr; + sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g); ret += tmpstr; } if(uses_bloom){ @@ -3179,7 +3186,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n" "\tunsigned int bucket, bucket0, bucket1, bucket2;\n" "\tlong long int curr_fj_ts;\n" -"\tunsigned int curr_bin, the_bin;\n" +"\tlong long int curr_bin, the_bin;\n" "\n" ; }else{ diff --git a/src/ftacmp/query_plan.cc b/src/ftacmp/query_plan.cc index 718e7be..e3ce4be 100644 --- a/src/ftacmp/query_plan.cc +++ b/src/ftacmp/query_plan.cc @@ -13609,7 +13609,7 @@ void mrg_qpn::create_protocol_se(vector q_sources, table_list *Schema for(s=1;s &ifpnames); - +// CONSTRUCTOR filter_join_qpn(){ }; filter_join_qpn(query_summary_class *qs,table_list *Schema){ @@ -1638,6 +1638,10 @@ public: err_str += tmpstr; error_code = 1; } + if(from[0]->get_interface() != from[1]->get_interface()){ + err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n"; + error_code = 1; + } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); diff --git a/src/ftacmp/stream_query.cc b/src/ftacmp/stream_query.cc index 17d4293..9d12cb0 100644 --- a/src/ftacmp/stream_query.cc +++ b/src/ftacmp/stream_query.cc @@ -1949,6 +1949,14 @@ void get_prefilter_temporal_cids(std::vector lfta_list, col_id_s gb_tbl = sgah_node->get_gb_tbl(); } + if(lfta_list[s]->query_plan[0]->node_type() == "filter_join"){ + filter_join_qpn *fj_node = (filter_join_qpn *)lfta_list[s]->query_plan[0]; + sl_list = fj_node->get_select_se_list(); + col_id ci; // also get the temporal var in case not in select list + ci.load_from_colref(fj_node->temporal_var); + temp_cids.insert(ci); + } + for(sl=0;slget_data_type(); if (sdt->is_temporal()) { diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index 091fafe..41df197 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -1897,84 +1897,117 @@ for(q=0;q tvec = split_queries[l]->query_plan[0]->get_input_tbls(); string liface = tvec[0]->get_interface(); // iface queries have been resolved string lmach = tvec[0]->get_machine(); - string schema_name = tvec[0]->get_schema_name(); - int schema_ref = tvec[0]->get_schema_ref(); - if (lmach == "") - lmach = hostname; - interface_names.push_back(liface); - machine_names.push_back(lmach); + + vector schemaid_preds; + for(int irv=0;irvget_schema_name(); + string rvar_name = tvec[irv]->get_var_name(); + int schema_ref = tvec[irv]->get_schema_ref(); + if (lmach == "") + lmach = hostname; + interface_names.push_back(liface); + machine_names.push_back(lmach); //printf("Machine is %s\n",lmach.c_str()); // Check if a schemaId constraint needs to be inserted. - if(schema_ref<0){ // can result from some kinds of splits - schema_ref = Schema->get_table_ref(schema_name); - } - int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL - int errnum = 0; - string if_error; - iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error); - if(iface==NULL){ - fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str()); - exit(1); - } - if(iface->has_multiple_schemas()){ - if(schema_id<0){ // invalid schema_id - fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str()); - exit(1); + if(schema_ref<0){ // can result from some kinds of splits + schema_ref = Schema->get_table_ref(schema_name); } - vector iface_schemas = iface->get_property("Schemas"); - if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){ - fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str()); + int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL + int errnum = 0; + string if_error; + iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error); + if(iface==NULL){ + fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str()); exit(1); - } + } + if(iface->has_multiple_schemas()){ + if(schema_id<0){ // invalid schema_id + fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str()); + exit(1); + } + vector iface_schemas = iface->get_property("Schemas"); + if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){ + fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str()); + exit(1); + } // Ensure that in liface, schema_id is used for only one schema - if(schema_of_schemaid.count(liface)==0){ - map empty_map; - schema_of_schemaid[liface] = empty_map; - } - if(schema_of_schemaid[liface].count(schema_id)==0){ - schema_of_schemaid[liface][schema_id] = schema_name; - }else{ - if(schema_of_schemaid[liface][schema_id] != schema_name){ - fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str()); + if(schema_of_schemaid.count(liface)==0){ + map empty_map; + schema_of_schemaid[liface] = empty_map; + } + if(schema_of_schemaid[liface].count(schema_id)==0){ + schema_of_schemaid[liface][schema_id] = schema_name; + }else{ + if(schema_of_schemaid[liface][schema_id] != schema_name){ + fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str()); + exit(1); + } + } + }else{ // single-schema interface + schema_id = -1; // don't generate schema_id predicate + vector iface_schemas = iface->get_property("Schemas"); + if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){ + fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str()); exit(1); } - } - }else{ // single-schema interface - schema_id = -1; // don't generate schema_id predicate - vector iface_schemas = iface->get_property("Schemas"); - if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){ - fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str()); - exit(1); - } - if(iface_schemas.size()>1){ - fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size()); - exit(1); - } - } + if(iface_schemas.size()>1){ + fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size()); + exit(1); + } + } // If we need to check the schema_id, insert a predicate into the lfta. // TODO not just schema_id, the full all_schema_ids set. - if(schema_id>=0){ - colref_t *schid_cr = new colref_t("schemaId"); - schid_cr->schema_ref = schema_ref; - schid_cr->tablevar_ref = 0; - scalarexp_t *schid_se = new scalarexp_t(schid_cr); - data_type *schid_dt = new data_type("uint"); - schid_se->dt = schid_dt; - - string schid_str = int_to_string(schema_id); - literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT); - scalarexp_t *lit_se = new scalarexp_t(schid_lit); - lit_se->dt = schid_dt; - - predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se); - vector clist; - make_cnf_from_pr(schid_pr, clist); - analyze_cnf(clist[0]); - clist[0]->cost = 1; // cheap one comparison + if(schema_id>=0){ + colref_t *schid_cr = new colref_t("schemaId"); + schid_cr->schema_ref = schema_ref; + schid_cr->table_name = rvar_name; + schid_cr->tablevar_ref = 0; + schid_cr->default_table = false; + scalarexp_t *schid_se = new scalarexp_t(schid_cr); + data_type *schid_dt = new data_type("uint"); + schid_se->dt = schid_dt; + + string schid_str = int_to_string(schema_id); + literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT); + scalarexp_t *lit_se = new scalarexp_t(schid_lit); + lit_se->dt = schid_dt; + + predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se); + vector clist; + make_cnf_from_pr(schid_pr, clist); + analyze_cnf(clist[0]); + clist[0]->cost = 1; // cheap one comparison // cnf built, now insert it. - split_queries[l]->query_plan[0]->append_to_where(clist[0]); + split_queries[l]->query_plan[0]->append_to_where(clist[0]); + +// Specialized processing ... currently filter join + string node_type = split_queries[l]->query_plan[0]->node_type(); + if(node_type == "filter_join"){ + filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0]; + if(irv==0){ + fj->pred_t0.push_back(clist[0]); + }else{ + fj->pred_t1.push_back(clist[0]); + } + schemaid_preds.push_back(schid_pr); + } + } + } +// Specialized processing, currently filter join. + if(schemaid_preds.size()>0){ + string node_type = split_queries[l]->query_plan[0]->node_type(); + if(node_type == "filter_join"){ + filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0]; + predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]); + vector clist; + make_cnf_from_pr(filter_pr, clist); + analyze_cnf(clist[0]); + clist[0]->cost = 1; // cheap one comparison + fj->shared_pred.push_back(clist[0]); + } } @@ -1997,7 +2030,9 @@ for(q=0;qquery_plan[0]->bind_to_schema(Schema); + // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines; /* diff --git a/src/lib/gscphftaaux/flip_udaf.cc b/src/lib/gscphftaaux/flip_udaf.cc index a44f3e5..0252fde 100644 --- a/src/lib/gscphftaaux/flip_udaf.cc +++ b/src/lib/gscphftaaux/flip_udaf.cc @@ -226,7 +226,7 @@ gs_uint32_t extr_quant_hfta0_fcn(vstring *v, gs_float_t phi) //printf("nelts=%d t=%llx\n",vs->nelts, (unsigned long long int)(vs->t)); supertuple_t *t, *p; gs_uint32_t nelts=0; - gs_uint32_t rmin=0, rmax, rank, ropt=UINT_MAX; + gs_int32_t rmin=0, rmax, rank, ropt=INT_MAX; gs_uint32_t count=0; for (t=vs->t; t != NULL; t=t->next) { @@ -235,7 +235,7 @@ gs_uint32_t extr_quant_hfta0_fcn(vstring *v, gs_float_t phi) nelts += t->gap; count++; } - rank = (gs_uint32_t) (phi*(float)nelts); + rank = (gs_int32_t) (phi*(float)nelts); for (t=vs->t; t != NULL; t=t->next) { rmin += t->gap; @@ -389,14 +389,14 @@ gs_uint32_t extr_quant_hfta3_fcn(vstring *v, gs_float_t phi) quant_udaf_hfta3_struct_t *vs = (quant_udaf_hfta3_struct_t *)(v->offset); supertuple_t *t, *p; gs_uint32_t nelts=0; - gs_uint32_t rmin=0, rmax, rank, ropt=UINT_MAX; + gs_int32_t rmin=0, rmax, rank, ropt=INT_MAX; gs_uint32_t count=0; for (t=vs->t; t != NULL; t=t->next) { nelts += t->gap; count++; } - rank = (gs_uint32_t) (phi*(float)nelts); + rank = (gs_int32_t) (phi*(float)nelts); for (t=vs->t; t != NULL; t=t->next) { rmin += t->gap; -- 2.16.6