vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
string liface = tvec[0]->get_interface(); // iface queries have been resolved
string lmach = tvec[0]->get_machine();
- string schema_name = tvec[0]->get_schema_name();
- int schema_ref = tvec[0]->get_schema_ref();
- if (lmach == "")
- lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
+
+ vector<predicate_t *> schemaid_preds;
+ for(int irv=0;irv<tvec.size();++irv){
+
+ string schema_name = tvec[irv]->get_schema_name();
+ string rvar_name = tvec[irv]->get_var_name();
+ int schema_ref = tvec[irv]->get_schema_ref();
+ if (lmach == "")
+ lmach = hostname;
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
//printf("Machine is %s\n",lmach.c_str());
// Check if a schemaId constraint needs to be inserted.
- if(schema_ref<0){ // can result from some kinds of splits
- schema_ref = Schema->get_table_ref(schema_name);
- }
- int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
- int errnum = 0;
- string if_error;
- iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
- if(iface==NULL){
- fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
- exit(1);
- }
- if(iface->has_multiple_schemas()){
- if(schema_id<0){ // invalid schema_id
- fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
- exit(1);
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
}
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
exit(1);
- }
+ }
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
// Ensure that in liface, schema_id is used for only one schema
- if(schema_of_schemaid.count(liface)==0){
- map<int, string> empty_map;
- schema_of_schemaid[liface] = empty_map;
- }
- if(schema_of_schemaid[liface].count(schema_id)==0){
- schema_of_schemaid[liface][schema_id] = schema_name;
- }else{
- if(schema_of_schemaid[liface][schema_id] != schema_name){
- fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
exit(1);
}
- }
- }else{ // single-schema interface
- schema_id = -1; // don't generate schema_id predicate
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
- exit(1);
- }
- if(iface_schemas.size()>1){
- fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
- exit(1);
- }
- }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
// If we need to check the schema_id, insert a predicate into the lfta.
// TODO not just schema_id, the full all_schema_ids set.
- if(schema_id>=0){
- colref_t *schid_cr = new colref_t("schemaId");
- schid_cr->schema_ref = schema_ref;
- schid_cr->tablevar_ref = 0;
- scalarexp_t *schid_se = new scalarexp_t(schid_cr);
- data_type *schid_dt = new data_type("uint");
- schid_se->dt = schid_dt;
-
- string schid_str = int_to_string(schema_id);
- literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
- scalarexp_t *lit_se = new scalarexp_t(schid_lit);
- lit_se->dt = schid_dt;
-
- predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
- vector<cnf_elem *> clist;
- make_cnf_from_pr(schid_pr, clist);
- analyze_cnf(clist[0]);
- clist[0]->cost = 1; // cheap one comparison
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
// cnf built, now insert it.
- split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing ... currently filter join
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>0){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
}
// THe following is a hack,
// as I should be generating LFTA code through
// the stream_query object.
+
split_queries[l]->query_plan[0]->bind_to_schema(Schema);
+
// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
/*