+// Check if a schemaId constraint needs to be inserted.
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
+ }
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+ exit(1);
+ }
+
+
+ if(tvec[irv]->get_interface() != "_local_"){
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+// Ensure that in liface, schema_id is used for only one schema
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
+ }else{
+ schema_id = -1;
+ }
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+// TODO not just schema_id, the full all_schema_ids set.
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+// cnf built, now insert it.
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing
+// filter join, get two schemaid preds
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+// watchlist join, get the first schemaid pred
+ if(node_type == "watch_join"){
+ watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>1){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
+ }
+
+
+
+
+
+
+