Add support for query key extraction
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
index 3a18f69..091fafe 100644 (file)
@@ -30,6 +30,8 @@ Copyright 2014 AT&T Intellectual Property
 #include <stdlib.h>
 #include <stdio.h>
 #include<ctype.h>
+#include<glob.h>
+#include<string.h>
 
 #include<list>
 
@@ -553,11 +555,37 @@ int main(int argc, char **argv){
          }
          Schema = fta_parse_result->tables;
 
+//                     Ensure that all schema_ids, if set, are distinct.
+//  Obsolete? There is code elsewhere to ensure that schema IDs are
+//  distinct on a per-interface basis.
+/*
+         set<int> found_ids;
+         set<int> dup_ids;
+         for(int t=0;t<Schema->size();++t){
+               int sch_id = Schema->get_table(t)->get_schema_id();
+               if(sch_id> 0){
+                       if(found_ids.find(sch_id) != found_ids.end()){
+                               dup_ids.insert(sch_id);
+                       }else{
+                               found_ids.insert(sch_id);
+                       }
+               }
+               if(dup_ids.size()>0){
+                       fprintf(stderr, "Error, the schema has duplicate schema_ids:");
+                       for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
+                               fprintf(stderr," %d",(*dit));
+                       fprintf(stderr,"\n");
+                       exit(1);
+               }
+         }
+*/
+
+
 //                     Process schema field inheritance
          int retval;
          retval = Schema->unroll_tables(err_str);
          if(retval){
-               fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
+               fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
                exit(1);
          }
        }else{
@@ -610,12 +638,31 @@ int main(int argc, char **argv){
 "#include \"fta.h\"\n"
 "#include \"lapp.h\"\n"
 "#include \"rts_udaf.h\"\n\n"
+;
+// Get any locally defined parsing headers
+    glob_t glob_result;
+    memset(&glob_result, 0, sizeof(glob_result));
+
+    // do the glob operation
+    int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
+       if(return_value == 0){
+       for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
+                       char *flds[1000];
+                       int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
+                       lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
+       }
+       }else{
+               fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
+       }
+
 /*
 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
 */
+
+       lfta_header += 
 "\n"
 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
 "\n"
@@ -1604,11 +1651,11 @@ for(q=0;q<hfta_sets.size();++q){
 //                             TODO: separate building operators from spliting lftas,
 //                                     that will make optimizations such as predicate pushing easier.
        vector<stream_query *> lfta_list;
-
        stream_query *rootq;
-
     int qi,qj;
 
+       map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
+
        for(qi=hfta_topsort.size()-1;qi>=0;--qi){
 
        int hfta_id = hfta_topsort[qi];
@@ -1841,21 +1888,101 @@ for(q=0;q<hfta_sets.size();++q){
 //                             Compute the number of LFTAs.
          int n_lfta = split_queries.size();
          if(hfta_returned) n_lfta--;
-
+//                             Check if a schemaId constraint needs to be inserted.
 
 //                             Process the LFTA components.
          for(l=0;l<n_lfta;++l){
           if(lfta_names.count(split_queries[l]->query_name) == 0){
 //                             Grab the lfta for global optimization.
                vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
-               string liface = tvec[0]->get_interface();
+               string liface = tvec[0]->get_interface();       // iface queries have been resolved
                string lmach = tvec[0]->get_machine();
+               string schema_name = tvec[0]->get_schema_name();
+               int schema_ref = tvec[0]->get_schema_ref();
                if (lmach == "")
                        lmach = hostname;
                interface_names.push_back(liface);
                machine_names.push_back(lmach);
 //printf("Machine is %s\n",lmach.c_str());
 
+//                             Check if a schemaId constraint needs to be inserted.
+               if(schema_ref<0){ // can result from some kinds of splits
+                       schema_ref = Schema->get_table_ref(schema_name);
+               }
+               int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
+               int errnum = 0;
+               string if_error;
+               iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+               if(iface==NULL){
+                       fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+                       exit(1);
+               }       
+               if(iface->has_multiple_schemas()){
+                       if(schema_id<0){        // invalid schema_id
+                               fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+                               exit(1);
+                       }
+                       vector<string> iface_schemas = iface->get_property("Schemas");
+                       if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+                               fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+                               exit(1);
+                       }
+// Ensure that in liface, schema_id is used for only one schema
+                       if(schema_of_schemaid.count(liface)==0){
+                               map<int, string> empty_map;
+                               schema_of_schemaid[liface] = empty_map;
+                       }
+                       if(schema_of_schemaid[liface].count(schema_id)==0){
+                               schema_of_schemaid[liface][schema_id] = schema_name;
+                       }else{
+                               if(schema_of_schemaid[liface][schema_id] != schema_name){
+                                       fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+                                       exit(1);
+                               }
+                       }
+               }else{  // single-schema interface
+                       schema_id = -1; // don't generate schema_id predicate
+                       vector<string> iface_schemas = iface->get_property("Schemas");
+                       if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+                               fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+                               exit(1);
+                       }
+                       if(iface_schemas.size()>1){
+                               fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+                               exit(1);
+                       }
+               }                       
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+//      TODO not just schema_id, the full all_schema_ids set.
+               if(schema_id>=0){
+                       colref_t *schid_cr = new colref_t("schemaId");
+                       schid_cr->schema_ref = schema_ref;
+                       schid_cr->tablevar_ref = 0;
+                       scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+                       data_type *schid_dt = new data_type("uint");
+                       schid_se->dt = schid_dt;
+
+                       string schid_str = int_to_string(schema_id);
+                       literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+                       scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+                       lit_se->dt = schid_dt;
+
+                       predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+                       vector<cnf_elem *> clist;
+                       make_cnf_from_pr(schid_pr, clist);
+                       analyze_cnf(clist[0]);
+                       clist[0]->cost = 1;     // cheap one comparison
+// cnf built, now insert it.
+                       split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+               }
+                       
+
+
+
+
+
+
 //                     Set the ht size from the recommendation, if there is one in the rec file
                if(lfta_htsize.count(split_queries[l]->query_name)>0){
                        split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
@@ -2100,17 +2227,26 @@ for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
                        hfta_namespace = hfta_list[i]->defines["namespace"];
                  if(hfta_list[i]->defines.count("Namespace")>0)
                        hfta_namespace = hfta_list[i]->defines["Namespace"];
-                 if(hfta_list[i]->defines.count("Namespace")>0)
-                       hfta_namespace = hfta_list[i]->defines["Namespace"];
+                 if(hfta_list[i]->defines.count("NAMESPACE")>0)
+                       hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
 
                  if(field_verifier != NULL){
                        string warning_str;
                        if(hfta_comment == "")
                                warning_str += "\tcomment not found.\n";
-                       if(hfta_title == "")
-                               warning_str += "\ttitle not found.\n";
-                       if(hfta_namespace == "")
-                               warning_str += "\tnamespace not found.\n";
+
+// Obsolete stuff that Carsten wanted
+//                     if(hfta_title == "")
+//                             warning_str += "\ttitle not found.\n";
+//                     if(hfta_namespace == "")
+//                             warning_str += "\tnamespace not found.\n";
+
+// STOPPED HERE
+//     There is a get_tbl_keys method implemented for qp_nodes,
+//     integrate it into steam_query, then call it to find keys,
+//     and annotate feidls with their key-ness.
+//     If there is a "keys" proprty in the defines block, override anything returned
+//     from the automated analysis
 
                        vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
                        int fi;
@@ -2122,6 +2258,62 @@ for(i=0;i<hfta_list.size();++i){         // query also has an HFTA component
                                        hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
                  }
 
+// Get the fields in this query
+                 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+
+// do key processing
+                 string hfta_keys_s = "";
+                 if(hfta_list[i]->defines.count("keys")>0)
+                       hfta_keys_s = hfta_list[i]->defines["keys"];
+                 if(hfta_list[i]->defines.count("Keys")>0)
+                       hfta_keys_s = hfta_list[i]->defines["Keys"];
+                 if(hfta_list[i]->defines.count("KEYS")>0)
+                       hfta_keys_s = hfta_list[i]->defines["KEYS"];
+                 string xtra_keys_s = "";
+                 if(hfta_list[i]->defines.count("extra_keys")>0)
+                       xtra_keys_s = hfta_list[i]->defines["extra_keys"];
+                 if(hfta_list[i]->defines.count("Extra_Keys")>0)
+                       xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
+                 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
+                       xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
+// get the keys
+                 vector<string> hfta_keys;
+                 vector<string> partial_keys;
+                 vector<string> xtra_keys;
+                 if(hfta_keys_s==""){
+                               hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
+                               if(xtra_keys_s.size()>0){
+                                       xtra_keys = split_string(xtra_keys_s, ',');
+                               }
+                               for(int xi=0;xi<xtra_keys.size();++xi){
+                                       if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
+                                               hfta_keys.push_back(xtra_keys[xi]);
+                                       }
+                               }
+                 }else{
+                               hfta_keys = split_string(hfta_keys_s, ',');
+                 }
+// validate that all of the keys exist in the output.
+//     (exit on error, as its a bad specificiation)
+                 vector<string> missing_keys;
+                 for(int ki=0;ki<hfta_keys.size(); ++ki){
+                       int fi;
+                       for(fi=0;fi<flds.size();++fi){
+                               if(hfta_keys[ki] == flds[fi]->get_name())
+                                       break;
+                       }
+                       if(fi==flds.size())
+                               missing_keys.push_back(hfta_keys[ki]);
+                 }
+                 if(missing_keys.size()>0){
+                       fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+                       for(int hi=0; hi<missing_keys.size(); ++hi){
+                               fprintf(stderr," %s", missing_keys[hi].c_str());
+                       }
+                       fprintf(stderr,"\n");
+                       exit(1);
+                 }
+
                  fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
                  if(hfta_comment != "")
                        fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
@@ -2133,15 +2325,25 @@ for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
                  fprintf(qtree_output,"\t\t<Rate value='100' />\n");
 
 //                             write info about fields to qtree.xml
-                 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
                  int fi;
                  for(fi=0;fi<flds.size();fi++){
                        fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
                        if(flds[fi]->get_modifier_list()->size()){
-                               fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
+                               fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
                        }
                        fprintf(qtree_output," />\n");
                  }
+// info about keys
+                 for(int hi=0;hi<hfta_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
+                 }
+                 for(int hi=0;hi<partial_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
+                 }
+                 for(int hi=0;hi<xtra_keys.size();++hi){
+                       fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
+                 }
+
 
                  // extract liveness timeout from query definition
                  int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
@@ -2449,8 +2651,9 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 
                                // combine all values for the interface property using comma separator
                                vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
+                               lfta_val[lmach] += "\t\t\treturn \"";
                                for (int j = 0; j < vals.size(); ++j) {
-                                       lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
+                                       lfta_val[lmach] +=  vals[j];
                                        if (j != vals.size()-1)
                                                lfta_val[lmach] += ",";
                                }
@@ -2499,11 +2702,17 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                
                        fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
 
-               lfta_val[lmach] += "\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
-               if(interface_names[mi]=="")
-                               lfta_val[lmach]+="DEFAULTDEV";
-               else
-                               lfta_val[lmach]+='"'+interface_names[mi]+'"';
+                       string this_iface = "DEFAULTDEV";
+                       if(interface_names[mi]!="")
+                               this_iface = '"'+interface_names[mi]+'"';
+                       lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
+               lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+//             if(interface_names[mi]=="")
+//                             lfta_val[lmach]+="DEFAULTDEV";
+//             else
+//                             lfta_val[lmach]+='"'+interface_names[mi]+'"';
+                       lfta_val[lmach] += this_iface;
+
 
                lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
                        +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
@@ -2572,10 +2781,11 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        string warning_str;
                        if(lfta_comment == "")
                                warning_str += "\tcomment not found.\n";
-                       if(lfta_title == "")
-                               warning_str += "\ttitle not found.\n";
-                       if(lfta_namespace == "")
-                               warning_str += "\tnamespace not found.\n";
+// Obsolete stuff that carsten wanted
+//                     if(lfta_title == "")
+//                             warning_str += "\ttitle not found.\n";
+//                     if(lfta_namespace == "")
+//                             warning_str += "\tnamespace not found.\n";
 
                        vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
                        int fi;
@@ -2726,6 +2936,43 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        if(generate_stats)
                fprintf(outfl,"  -DLFTA_STATS");
 
+//             Gather the set of interfaces
+//             Also, gather "base interface names" for use in computing
+//             the hash splitting to virtual interfaces.
+//             TODO : must update to hanndle machines
+       set<string> ifaces;
+       set<string> base_vifaces;       // base interfaces of virtual interfaces
+       map<string, string> ifmachines;
+       map<string, string> ifattrs;
+       for(i=0;i<interface_names.size();++i){
+               ifaces.insert(interface_names[i]);
+               ifmachines[interface_names[i]] = machine_names[i];
+
+               size_t Xpos = interface_names[i].find_last_of("X");
+               if(Xpos!=string::npos){
+                       string iface = interface_names[i].substr(0,Xpos);
+                       base_vifaces.insert(iface);
+               }
+               // get interface attributes and add them to the list
+       }
+
+//             Do we need to include protobuf libraries?
+//             TODO Move to the interface library: get the libraries to include 
+//             for an interface type
+
+       bool use_proto = false;
+       int erri;
+       string err_str;
+       for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
+               string ifnm = (*ssi);
+               vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
+               for(int ift_i=0;ift_i<ift.size();ift_i++){
+                       if(ift[ift_i]=="PROTO"){
+                               use_proto = true;
+                       }
+               }
+       }
+
        fprintf(outfl,
 "\n"
 "\n"
@@ -2746,11 +2993,13 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        fprintf(outfl,
 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
        if(use_pads)
-               fprintf(outfl, "-lpz -lz -lbz ");
+               fprintf(outfl, " -lpz -lz -lbz ");
        if(libz_exists && libast_exists)
-               fprintf(outfl,"-last ");
+               fprintf(outfl," -last ");
        if(use_pads)
-               fprintf(outfl, "-ldll -ldl ");
+               fprintf(outfl, " -ldll -ldl ");
+       if(use_proto)
+               fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
        fprintf(outfl," -lgscpaux");
 #ifdef GCOV
        fprintf(outfl," -fprofile-arcs");
@@ -2815,25 +3064,6 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
                exit(0);
        }
 
-//             Gather the set of interfaces
-//             Also, gather "base interface names" for use in computing
-//             the hash splitting to virtual interfaces.
-//             TODO : must update to hanndle machines
-       set<string> ifaces;
-       set<string> base_vifaces;       // base interfaces of virtual interfaces
-       map<string, string> ifmachines;
-       map<string, string> ifattrs;
-       for(i=0;i<interface_names.size();++i){
-               ifaces.insert(interface_names[i]);
-               ifmachines[interface_names[i]] = machine_names[i];
-
-               size_t Xpos = interface_names[i].find_last_of("X");
-               if(Xpos!=string::npos){
-                       string iface = interface_names[i].substr(0,Xpos);
-                       base_vifaces.insert(iface);
-               }
-               // get interface attributes and add them to the list
-       }
 
        fputs(
 ("#!/bin/sh\n"
@@ -2848,8 +3078,8 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 "ADDR=`cat gshub.log`\n"
 "ps opgid= $! >> gs.pids\n"
 "./rts $ADDR default ").c_str(), outfl);
-       int erri;
-       string err_str;
+//     int erri;
+//     string err_str;
        for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
                string ifnm = (*ssi);
                fprintf(outfl, "%s ",ifnm.c_str());