Add support for query key extraction
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
index 4a1263f..091fafe 100644 (file)
@@ -555,11 +555,37 @@ int main(int argc, char **argv){
          }
          Schema = fta_parse_result->tables;
 
+//                     Ensure that all schema_ids, if set, are distinct.
+//  Obsolete? There is code elsewhere to ensure that schema IDs are
+//  distinct on a per-interface basis.
+/*
+         set<int> found_ids;
+         set<int> dup_ids;
+         for(int t=0;t<Schema->size();++t){
+               int sch_id = Schema->get_table(t)->get_schema_id();
+               if(sch_id> 0){
+                       if(found_ids.find(sch_id) != found_ids.end()){
+                               dup_ids.insert(sch_id);
+                       }else{
+                               found_ids.insert(sch_id);
+                       }
+               }
+               if(dup_ids.size()>0){
+                       fprintf(stderr, "Error, the schema has duplicate schema_ids:");
+                       for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
+                               fprintf(stderr," %d",(*dit));
+                       fprintf(stderr,"\n");
+                       exit(1);
+               }
+         }
+*/
+
+
 //                     Process schema field inheritance
          int retval;
          retval = Schema->unroll_tables(err_str);
          if(retval){
-               fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
+               fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
                exit(1);
          }
        }else{
@@ -1625,11 +1651,11 @@ for(q=0;q<hfta_sets.size();++q){
 //                             TODO: separate building operators from spliting lftas,
 //                                     that will make optimizations such as predicate pushing easier.
        vector<stream_query *> lfta_list;
-
        stream_query *rootq;
-
     int qi,qj;
 
+       map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
+
        for(qi=hfta_topsort.size()-1;qi>=0;--qi){
 
        int hfta_id = hfta_topsort[qi];
@@ -1862,21 +1888,101 @@ for(q=0;q<hfta_sets.size();++q){
 //                             Compute the number of LFTAs.
          int n_lfta = split_queries.size();
          if(hfta_returned) n_lfta--;
-
+//                             Check if a schemaId constraint needs to be inserted.
 
 //                             Process the LFTA components.
          for(l=0;l<n_lfta;++l){
           if(lfta_names.count(split_queries[l]->query_name) == 0){
 //                             Grab the lfta for global optimization.
                vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
-               string liface = tvec[0]->get_interface();
+               string liface = tvec[0]->get_interface();       // iface queries have been resolved
                string lmach = tvec[0]->get_machine();
+               string schema_name = tvec[0]->get_schema_name();
+               int schema_ref = tvec[0]->get_schema_ref();
                if (lmach == "")
                        lmach = hostname;
                interface_names.push_back(liface);
                machine_names.push_back(lmach);
 //printf("Machine is %s\n",lmach.c_str());
 
+//                             Check if a schemaId constraint needs to be inserted.
+               if(schema_ref<0){ // can result from some kinds of splits
+                       schema_ref = Schema->get_table_ref(schema_name);
+               }
+               int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
+               int errnum = 0;
+               string if_error;
+               iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+               if(iface==NULL){
+                       fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+                       exit(1);
+               }       
+               if(iface->has_multiple_schemas()){
+                       if(schema_id<0){        // invalid schema_id
+                               fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+                               exit(1);
+                       }
+                       vector<string> iface_schemas = iface->get_property("Schemas");
+                       if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+                               fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+                               exit(1);
+                       }
+// Ensure that in liface, schema_id is used for only one schema
+                       if(schema_of_schemaid.count(liface)==0){
+                               map<int, string> empty_map;
+                               schema_of_schemaid[liface] = empty_map;
+                       }
+                       if(schema_of_schemaid[liface].count(schema_id)==0){
+                               schema_of_schemaid[liface][schema_id] = schema_name;
+                       }else{
+                               if(schema_of_schemaid[liface][schema_id] != schema_name){
+                                       fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+                                       exit(1);
+                               }
+                       }
+               }else{  // single-schema interface
+                       schema_id = -1; // don't generate schema_id predicate
+                       vector<string> iface_schemas = iface->get_property("Schemas");
+                       if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+                               fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+                               exit(1);
+                       }
+                       if(iface_schemas.size()>1){
+                               fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+                               exit(1);
+                       }
+               }                       
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+//      TODO not just schema_id, the full all_schema_ids set.
+               if(schema_id>=0){
+                       colref_t *schid_cr = new colref_t("schemaId");
+                       schid_cr->schema_ref = schema_ref;
+                       schid_cr->tablevar_ref = 0;
+                       scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+                       data_type *schid_dt = new data_type("uint");
+                       schid_se->dt = schid_dt;
+
+                       string schid_str = int_to_string(schema_id);
+                       literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+                       scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+                       lit_se->dt = schid_dt;
+
+                       predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+                       vector<cnf_elem *> clist;
+                       make_cnf_from_pr(schid_pr, clist);
+                       analyze_cnf(clist[0]);
+                       clist[0]->cost = 1;     // cheap one comparison
+// cnf built, now insert it.
+                       split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+               }
+                       
+
+
+
+
+
+
 //                     Set the ht size from the recommendation, if there is one in the rec file
                if(lfta_htsize.count(split_queries[l]->query_name)>0){
                        split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
@@ -2121,17 +2227,26 @@ for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
                        hfta_namespace = hfta_list[i]->defines["namespace"];
                  if(hfta_list[i]->defines.count("Namespace")>0)
                        hfta_namespace = hfta_list[i]->defines["Namespace"];
-                 if(hfta_list[i]->defines.count("Namespace")>0)
-                       hfta_namespace = hfta_list[i]->defines["Namespace"];
+                 if(hfta_list[i]->defines.count("NAMESPACE")>0)
+                       hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
 
                  if(field_verifier != NULL){
                        string warning_str;
                        if(hfta_comment == "")
                                warning_str += "\tcomment not found.\n";
-                       if(hfta_title == "")
-                               warning_str += "\ttitle not found.\n";
-                       if(hfta_namespace == "")
-                               warning_str += "\tnamespace not found.\n";
+
+// Obsolete stuff that Carsten wanted
+//                     if(hfta_title == "")
+//                             warning_str += "\ttitle not found.\n";
+//                     if(hfta_namespace == "")
+//                             warning_str += "\tnamespace not found.\n";
+
+// STOPPED HERE
+//     There is a get_tbl_keys method implemented for qp_nodes,
+//     integrate it into steam_query, then call it to find keys,
+//     and annotate feidls with their key-ness.
+//     If there is a "keys" proprty in the defines block, override anything returned
+//     from the automated analysis
 
                        vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
                        int fi;
@@ -2143,6 +2258,62 @@ for(i=0;i<hfta_list.size();++i){         // query also has an HFTA component
                                        hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
                  }
 
+// Get the fields in this query
+                 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+
+// do key processing
+                 string hfta_keys_s = "";
+                 if(hfta_list[i]->defines.count("keys")>0)
+                       hfta_keys_s = hfta_list[i]->defines["keys"];
+                 if(hfta_list[i]->defines.count("Keys")>0)
+                       hfta_keys_s = hfta_list[i]->defines["Keys"];
+                 if(hfta_list[i]->defines.count("KEYS")>0)
+                       hfta_keys_s = hfta_list[i]->defines["KEYS"];
+                 string xtra_keys_s = "";
+                 if(hfta_list[i]->defines.count("extra_keys")>0)
+                       xtra_keys_s = hfta_list[i]->defines["extra_keys"];
+                 if(hfta_list[i]->defines.count("Extra_Keys")>0)
+                       xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
+                 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
+                       xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
+// get the keys
+                 vector<string> hfta_keys;
+                 vector<string> partial_keys;
+                 vector<string> xtra_keys;
+                 if(hfta_keys_s==""){
+                               hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
+                               if(xtra_keys_s.size()>0){
+                                       xtra_keys = split_string(xtra_keys_s, ',');
+                               }
+                               for(int xi=0;xi<xtra_keys.size();++xi){
+                                       if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
+                                               hfta_keys.push_back(xtra_keys[xi]);
+                                       }
+                               }
+                 }else{
+                               hfta_keys = split_string(hfta_keys_s, ',');
+                 }
+// validate that all of the keys exist in the output.
+//     (exit on error, as its a bad specificiation)
+                 vector<string> missing_keys;
+                 for(int ki=0;ki<hfta_keys.size(); ++ki){
+                       int fi;
+                       for(fi=0;fi<flds.size();++fi){
+                               if(hfta_keys[ki] == flds[fi]->get_name())
+                                       break;
+                       }
+                       if(fi==flds.size())
+                               missing_keys.push_back(hfta_keys[ki]);
+                 }
+                 if(missing_keys.size()>0){
+                       fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+                       for(int hi=0; hi<missing_keys.size(); ++hi){
+                               fprintf(stderr," %s", missing_keys[hi].c_str());
+                       }
+                       fprintf(stderr,"\n");
+                       exit(1);
+                 }
+
                  fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
                  if(hfta_comment != "")
                        fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
@@ -2154,15 +2325,25 @@ for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
                  fprintf(qtree_output,"\t\t<Rate value='100' />\n");
 
 //                             write info about fields to qtree.xml
-                 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
                  int fi;
                  for(fi=0;fi<flds.size();fi++){
                        fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
                        if(flds[fi]->get_modifier_list()->size()){
-                               fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
+                               fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
                        }
                        fprintf(qtree_output," />\n");
                  }
+// info about keys
+                 for(int hi=0;hi<hfta_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
+                 }
+                 for(int hi=0;hi<partial_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
+                 }
+                 for(int hi=0;hi<xtra_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
+                 }
+
 
                  // extract liveness timeout from query definition
                  int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
@@ -2470,8 +2651,9 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 
                                // combine all values for the interface property using comma separator
                                vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
+                               lfta_val[lmach] += "\t\t\treturn \"";
                                for (int j = 0; j < vals.size(); ++j) {
-                                       lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
+                                       lfta_val[lmach] +=  vals[j];
                                        if (j != vals.size()-1)
                                                lfta_val[lmach] += ",";
                                }
@@ -2599,10 +2781,11 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        string warning_str;
                        if(lfta_comment == "")
                                warning_str += "\tcomment not found.\n";
-                       if(lfta_title == "")
-                               warning_str += "\ttitle not found.\n";
-                       if(lfta_namespace == "")
-                               warning_str += "\tnamespace not found.\n";
+// Obsolete stuff that carsten wanted
+//                     if(lfta_title == "")
+//                             warning_str += "\ttitle not found.\n";
+//                     if(lfta_namespace == "")
+//                             warning_str += "\tnamespace not found.\n";
 
                        vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
                        int fi;
@@ -2774,6 +2957,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        }
 
 //             Do we need to include protobuf libraries?
+//             TODO Move to the interface library: get the libraries to include 
+//             for an interface type
+
        bool use_proto = false;
        int erri;
        string err_str;