}
Schema = fta_parse_result->tables;
+// Ensure that all schema_ids, if set, are distinct.
+// Obsolete? There is code elsewhere to ensure that schema IDs are
+// distinct on a per-interface basis.
+/*
+ set<int> found_ids;
+ set<int> dup_ids;
+ for(int t=0;t<Schema->size();++t){
+ int sch_id = Schema->get_table(t)->get_schema_id();
+ if(sch_id> 0){
+ if(found_ids.find(sch_id) != found_ids.end()){
+ dup_ids.insert(sch_id);
+ }else{
+ found_ids.insert(sch_id);
+ }
+ }
+ if(dup_ids.size()>0){
+ fprintf(stderr, "Error, the schema has duplicate schema_ids:");
+ for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
+ fprintf(stderr," %d",(*dit));
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+ }
+*/
+
+
// Process schema field inheritance
int retval;
retval = Schema->unroll_tables(err_str);
if(retval){
- fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
+ fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
exit(1);
}
}else{
// TODO: separate building operators from spliting lftas,
// that will make optimizations such as predicate pushing easier.
vector<stream_query *> lfta_list;
-
stream_query *rootq;
-
int qi,qj;
+ map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
+
for(qi=hfta_topsort.size()-1;qi>=0;--qi){
int hfta_id = hfta_topsort[qi];
// Compute the number of LFTAs.
int n_lfta = split_queries.size();
if(hfta_returned) n_lfta--;
-
+// Check if a schemaId constraint needs to be inserted.
// Process the LFTA components.
for(l=0;l<n_lfta;++l){
if(lfta_names.count(split_queries[l]->query_name) == 0){
// Grab the lfta for global optimization.
vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
+ string liface = tvec[0]->get_interface(); // iface queries have been resolved
string lmach = tvec[0]->get_machine();
+ string schema_name = tvec[0]->get_schema_name();
+ int schema_ref = tvec[0]->get_schema_ref();
if (lmach == "")
lmach = hostname;
interface_names.push_back(liface);
machine_names.push_back(lmach);
//printf("Machine is %s\n",lmach.c_str());
+// Check if a schemaId constraint needs to be inserted.
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
+ }
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+ exit(1);
+ }
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+// Ensure that in liface, schema_id is used for only one schema
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+// TODO not just schema_id, the full all_schema_ids set.
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->tablevar_ref = 0;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+// cnf built, now insert it.
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+ }
+
+
+
+
+
+
+
// Set the ht size from the recommendation, if there is one in the rec file
if(lfta_htsize.count(split_queries[l]->query_name)>0){
split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
hfta_namespace = hfta_list[i]->defines["namespace"];
if(hfta_list[i]->defines.count("Namespace")>0)
hfta_namespace = hfta_list[i]->defines["Namespace"];
- if(hfta_list[i]->defines.count("Namespace")>0)
- hfta_namespace = hfta_list[i]->defines["Namespace"];
+ if(hfta_list[i]->defines.count("NAMESPACE")>0)
+ hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
if(field_verifier != NULL){
string warning_str;
if(hfta_comment == "")
warning_str += "\tcomment not found.\n";
- if(hfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(hfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
+
+// Obsolete stuff that Carsten wanted
+// if(hfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(hfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
+
+// STOPPED HERE
+// There is a get_tbl_keys method implemented for qp_nodes,
+// integrate it into steam_query, then call it to find keys,
+// and annotate feidls with their key-ness.
+// If there is a "keys" proprty in the defines block, override anything returned
+// from the automated analysis
vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
int fi;
hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
}
+// Get the fields in this query
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+
+// do key processing
+ string hfta_keys_s = "";
+ if(hfta_list[i]->defines.count("keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["keys"];
+ if(hfta_list[i]->defines.count("Keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["Keys"];
+ if(hfta_list[i]->defines.count("KEYS")>0)
+ hfta_keys_s = hfta_list[i]->defines["KEYS"];
+ string xtra_keys_s = "";
+ if(hfta_list[i]->defines.count("extra_keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["extra_keys"];
+ if(hfta_list[i]->defines.count("Extra_Keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
+ if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
+ xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
+// get the keys
+ vector<string> hfta_keys;
+ vector<string> partial_keys;
+ vector<string> xtra_keys;
+ if(hfta_keys_s==""){
+ hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
+ if(xtra_keys_s.size()>0){
+ xtra_keys = split_string(xtra_keys_s, ',');
+ }
+ for(int xi=0;xi<xtra_keys.size();++xi){
+ if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
+ hfta_keys.push_back(xtra_keys[xi]);
+ }
+ }
+ }else{
+ hfta_keys = split_string(hfta_keys_s, ',');
+ }
+// validate that all of the keys exist in the output.
+// (exit on error, as its a bad specificiation)
+ vector<string> missing_keys;
+ for(int ki=0;ki<hfta_keys.size(); ++ki){
+ int fi;
+ for(fi=0;fi<flds.size();++fi){
+ if(hfta_keys[ki] == flds[fi]->get_name())
+ break;
+ }
+ if(fi==flds.size())
+ missing_keys.push_back(hfta_keys[ki]);
+ }
+ if(missing_keys.size()>0){
+ fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ for(int hi=0; hi<missing_keys.size(); ++hi){
+ fprintf(stderr," %s", missing_keys[hi].c_str());
+ }
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+
fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
if(hfta_comment != "")
fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
fprintf(qtree_output,"\t\t<Rate value='100' />\n");
// write info about fields to qtree.xml
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
int fi;
for(fi=0;fi<flds.size();fi++){
fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
if(flds[fi]->get_modifier_list()->size()){
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
+ fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
}
fprintf(qtree_output," />\n");
}
+// info about keys
+ for(int hi=0;hi<hfta_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
+ }
+ for(int hi=0;hi<partial_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
+ }
+ for(int hi=0;hi<xtra_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
+ }
+
// extract liveness timeout from query definition
int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
// combine all values for the interface property using comma separator
vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
+ lfta_val[lmach] += "\t\t\treturn \"";
for (int j = 0; j < vals.size(); ++j) {
- lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
+ lfta_val[lmach] += vals[j];
if (j != vals.size()-1)
lfta_val[lmach] += ",";
}
string warning_str;
if(lfta_comment == "")
warning_str += "\tcomment not found.\n";
- if(lfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(lfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
+// Obsolete stuff that carsten wanted
+// if(lfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(lfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
int fi;
}
// Do we need to include protobuf libraries?
+// TODO Move to the interface library: get the libraries to include
+// for an interface type
+
bool use_proto = false;
int erri;
string err_str;