int k;
for(k=0;k<fs->hash_eq.size();++k){
sprintf(tmpstr,"key_var%d",k);
- ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
+ ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
}
ret += "\tlong long int ts;\n";
ret += "};\n\n";
// bloom filter needs to be advanced.
// SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
// t->bf_size : number of bits in bloom filter
+// TODO: vectorize?
+// TODO: Don't iterate more than n_bloom times!
+// As written, its possible to wrap around many times.
if(fs->use_bloom){
ret +=
"// Clean out old bloom filters if needed.\n"
+"// TODO vectorize this ? \n"
" if(t->first_exec){\n"
" t->first_exec = 0;\n"
" t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
;
}
}else{
+ ret += "// Add the S record to the hash table, choose a position\n";
ret += "\t\tbucket=0;\n";
for(p=0;p<fs->hash_eq.size();++p){
ret +=
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
ret += "){\n\t\t\tthe_bucket = bucket;\n";
- ret += "\t\t}else {if(";
+ ret += "\t\t}else{\n\t\t\tif(";
for(p=0;p<fs->hash_eq.size();++p){
if(p>0) ret += " && ";
// ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
// " == s_equijoin_"+int_to_string(p);
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
- string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
+ string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
string rhs_op = "s_equijoin_"+int_to_string(p);
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
- ret += "){\n\t\t\tthe_bucket = bucket1;\n";
- ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
- ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
- ret += "\t\t}}\n";
+ ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
+ ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
+ ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
+ ret += "\t\t\t}\n\t\t}\n";
for(p=0;p<fs->hash_eq.size();++p){
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
if(hdt->is_buffer_type()){
}
}
-// Sort S preds based on cost.
+// Sort R preds based on cost.
vector<cnf_elem *> tmp_wh;
for(w=0;w<r_filt.size();++w){
ret += "\n// Do the join\n\n";
for(p=0;p<fs->hash_eq.size();++p)
- ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
+ ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
// Passed the cheap pred, now test the join with S.
ret +=
" bucket"+int_to_string(i)+
" ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
- fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
+"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
}
ret +=
for(p=0;p<fs->hash_eq.size();++p){
ret +=
" bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
- fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
+"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
}
ret +=
// ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
// " == r_equijoin_"+int_to_string(p);
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
- string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
+ string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
string rhs_op = "s_equijoin_"+int_to_string(p);
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
bool uses_bloom = fjq->use_bloom;
ret += "/*\t\tJoin fields\t*/\n";
for(g=0;g<fjq->hash_eq.size();g++){
- sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
+ sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
+ ret += tmpstr;
+ sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
ret += tmpstr;
}
if(uses_bloom){
"\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
"\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
"\tlong long int curr_fj_ts;\n"
-"\tunsigned int curr_bin, the_bin;\n"
+"\tlong long int curr_bin, the_bin;\n"
"\n"
;
}else{
vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
string liface = tvec[0]->get_interface(); // iface queries have been resolved
string lmach = tvec[0]->get_machine();
- string schema_name = tvec[0]->get_schema_name();
- int schema_ref = tvec[0]->get_schema_ref();
- if (lmach == "")
- lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
+
+ vector<predicate_t *> schemaid_preds;
+ for(int irv=0;irv<tvec.size();++irv){
+
+ string schema_name = tvec[irv]->get_schema_name();
+ string rvar_name = tvec[irv]->get_var_name();
+ int schema_ref = tvec[irv]->get_schema_ref();
+ if (lmach == "")
+ lmach = hostname;
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
//printf("Machine is %s\n",lmach.c_str());
// Check if a schemaId constraint needs to be inserted.
- if(schema_ref<0){ // can result from some kinds of splits
- schema_ref = Schema->get_table_ref(schema_name);
- }
- int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
- int errnum = 0;
- string if_error;
- iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
- if(iface==NULL){
- fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
- exit(1);
- }
- if(iface->has_multiple_schemas()){
- if(schema_id<0){ // invalid schema_id
- fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
- exit(1);
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
}
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
exit(1);
- }
+ }
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
// Ensure that in liface, schema_id is used for only one schema
- if(schema_of_schemaid.count(liface)==0){
- map<int, string> empty_map;
- schema_of_schemaid[liface] = empty_map;
- }
- if(schema_of_schemaid[liface].count(schema_id)==0){
- schema_of_schemaid[liface][schema_id] = schema_name;
- }else{
- if(schema_of_schemaid[liface][schema_id] != schema_name){
- fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
exit(1);
}
- }
- }else{ // single-schema interface
- schema_id = -1; // don't generate schema_id predicate
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
- exit(1);
- }
- if(iface_schemas.size()>1){
- fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
- exit(1);
- }
- }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
// If we need to check the schema_id, insert a predicate into the lfta.
// TODO not just schema_id, the full all_schema_ids set.
- if(schema_id>=0){
- colref_t *schid_cr = new colref_t("schemaId");
- schid_cr->schema_ref = schema_ref;
- schid_cr->tablevar_ref = 0;
- scalarexp_t *schid_se = new scalarexp_t(schid_cr);
- data_type *schid_dt = new data_type("uint");
- schid_se->dt = schid_dt;
-
- string schid_str = int_to_string(schema_id);
- literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
- scalarexp_t *lit_se = new scalarexp_t(schid_lit);
- lit_se->dt = schid_dt;
-
- predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
- vector<cnf_elem *> clist;
- make_cnf_from_pr(schid_pr, clist);
- analyze_cnf(clist[0]);
- clist[0]->cost = 1; // cheap one comparison
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
// cnf built, now insert it.
- split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing ... currently filter join
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>0){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
}
// THe following is a hack,
// as I should be generating LFTA code through
// the stream_query object.
+
split_queries[l]->query_plan[0]->bind_to_schema(Schema);
+
// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
/*