-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#include<unistd.h> // for gethostname\r
-\r
-#include <string>\r
-#include "parse_fta.h"\r
-#include "parse_schema.h"\r
-#include "parse_ext_fcns.h"\r
-#include"analyze_fta.h"\r
-#include"query_plan.h"\r
-#include"generate_lfta_code.h"\r
-#include"stream_query.h"\r
-#include"generate_utils.h"\r
-#include"nic_def.h"\r
-#include"generate_nic_code.h"\r
-\r
-#include <stdlib.h>\r
-#include <stdio.h>\r
-#include<ctype.h>\r
-#include<glob.h>\r
-#include<string.h>\r
-\r
-#include<list>\r
-\r
-// for the scandir\r
- #include <sys/types.h>\r
- #include <dirent.h>\r
-\r
-\r
-#include<errno.h>\r
-\r
-// to verify that some files exist.\r
- #include <sys/types.h>\r
- #include <sys/stat.h>\r
-\r
-#include "parse_partn.h"\r
-\r
-#include "print_plan.h"\r
-\r
-// Interface to the xml parser\r
-\r
-#include"xml_t.h"\r
-#include"field_list.h"\r
-\r
-extern int xmlParserparse(void);\r
-extern FILE *xmlParserin;\r
-extern int xmlParserdebug;\r
-\r
-std::vector<std::string> xml_attr_vec;\r
-std::vector<std::string> xml_val_vec;\r
-std::string xml_a, xml_v;\r
-xml_t *xml_leaves = NULL;\r
-\r
-// Interface to the field list verifier\r
-field_list *field_verifier = NULL;\r
-\r
-#define TMPSTRLEN 1000\r
-\r
-#ifndef PATH_DELIM\r
- #define PATH_DELIM '/'\r
-#endif\r
-\r
-char tmp_schema_str[10000];\r
-\r
-// maximum delay between two hearbeats produced\r
-// by UDOP. Used when its not explicity\r
-// provided in udop definition\r
-#define DEFAULT_UDOP_LIVENESS_TIMEOUT 5\r
-\r
-// Default lfta hash table size, must be power of 2.\r
-int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;\r
-\r
-// Interface to FTA definition lexer and parser ...\r
-\r
-extern int FtaParserparse(void);\r
-extern FILE *FtaParserin;\r
-extern int FtaParserdebug;\r
-\r
-fta_parse_t *fta_parse_result;\r
-var_defs_t *fta_parse_defines;\r
-\r
-\r
-\r
-// Interface to external function lexer and parser ...\r
-\r
-extern int Ext_fcnsParserparse(void);\r
-extern FILE *Ext_fcnsParserin;\r
-extern int Ext_fcnsParserdebug;\r
-\r
-ext_fcn_list *Ext_fcns;\r
-\r
-\r
-// Interface to partition definition parser\r
-extern int PartnParserparse();\r
-partn_def_list_t *partn_parse_result = NULL;\r
-\r
-\r
-\r
-using namespace std;\r
-//extern int errno;\r
-\r
-\r
-// forward delcaration of local utility function\r
-void generate_makefile(vector<string> &input_file_names, int nfiles,\r
- vector<string> &hfta_names, opview_set &opviews,\r
- vector<string> &machine_names,\r
- string schema_file_name,\r
- vector<string> &interface_names,\r
- ifq_t *ifdb, string &config_dir_path,\r
- bool use_pads,\r
- string extra_libs,\r
- map<string, vector<int> > &rts_hload\r
- );\r
-\r
-//static int split_string(char *instr,char sep, char **words,int max_words);\r
-#define MAXFLDS 100\r
-\r
- FILE *schema_summary_output = NULL; // query names\r
-\r
-// Dump schema summary\r
-void dump_summary(stream_query *str){\r
- fprintf(schema_summary_output,"%s\n",str->query_name.c_str());\r
-\r
- table_def *sch = str->get_output_tabledef();\r
-\r
- vector<field_entry *> flds = sch->get_fields();\r
- int f;\r
- for(f=0;f<flds.size();++f){\r
- if(f>0) fprintf(schema_summary_output,"|");\r
- fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());\r
- }\r
- fprintf(schema_summary_output,"\n");\r
- for(f=0;f<flds.size();++f){\r
- if(f>0) fprintf(schema_summary_output,"|");\r
- fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());\r
- }\r
- fprintf(schema_summary_output,"\n");\r
-}\r
-\r
-// Globals\r
-string hostname; // name of current host.\r
-int hostname_len;\r
-bool generate_stats = false;\r
-string root_path = "../..";\r
-\r
-\r
-int main(int argc, char **argv){\r
- char tmpstr[TMPSTRLEN];\r
- string err_str;\r
- int q,s,h,f;\r
-\r
- set<int>::iterator si;\r
-\r
- vector<string> query_names; // for lfta.c registration\r
- map<string, vector<int> > mach_query_names; // list queries of machine\r
- vector<int> snap_lengths; // for lfta.c registration\r
- vector<string> interface_names; // for lfta.c registration\r
- vector<string> machine_names; // machine of interface\r
- vector<bool> lfta_reuse_options; // for lfta.c registration\r
- vector<int> lfta_liveness_timeouts; // fot qtree.xml generation\r
- vector<string> hfta_names; // hfta cource code names, for\r
- // creating make file.\r
- vector<string> qnames; // ensure unique names\r
- map<string, int> lfta_names; // keep track of unique lftas.\r
-\r
-\r
-// set these to 1 to debug the parser\r
- FtaParserdebug = 0;\r
- Ext_fcnsParserdebug = 0;\r
-\r
- FILE *lfta_out; // lfta.c output.\r
- FILE *fta_in; // input file\r
- FILE *table_schemas_in; // source tables definition file\r
- FILE *query_name_output; // query names\r
- FILE *qtree_output; // interconnections of query nodes\r
-\r
- // -------------------------------\r
- // Handling of Input Arguments\r
- // -------------------------------\r
- char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";\r
- const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"\r
- "\t[-B] : debug only (don't create output files)\n"\r
- "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"\r
- "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"\r
- "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"\r
- "\t[-C] : use <config directory> for definition files\n"\r
- "\t[-l] : use <library directory> for library queries\n"\r
- "\t[-N] : output query names in query_names.txt\n"\r
- "\t[-H] : create HFTA only (no schema_file)\n"\r
- "\t[-Q] : use query name for hfta suffix\n"\r
- "\t[-M] : generate make file and runit, stopit scripts\n"\r
- "\t[-S] : enable LFTA statistics (alters Makefile).\n"\r
- "\t[-f] : Output schema summary to schema_summary.txt\n"\r
- "\t[-P] : link with PADS\n"\r
- "\t[-h] : override host name.\n"\r
- "\t[-c] : clean out Makefile and hfta_*.cc first.\n"\r
- "\t[-R] : path to root of GS-lite\n"\r
-;\r
-\r
-// parameters gathered from command line processing\r
- string external_fcns_path;\r
-// string internal_fcn_path;\r
- string config_dir_path;\r
- string library_path = "./";\r
- vector<string> input_file_names;\r
- string schema_file_name;\r
- bool debug_only = false;\r
- bool hfta_only = false;\r
- bool output_query_names = false;\r
- bool output_schema_summary=false;\r
- bool numeric_hfta_flname = true;\r
- bool create_makefile = false;\r
- bool distributed_mode = false;\r
- bool partitioned_mode = false;\r
- bool use_live_hosts_file = false;\r
- bool use_pads = false;\r
- bool clean_make = false;\r
- int n_virtual_interfaces = 1;\r
-\r
- char chopt;\r
- while((chopt = getopt(argc,argv,optstr)) != -1){\r
- switch(chopt){\r
- case 'B':\r
- debug_only = true;\r
- break;\r
- case 'D':\r
- distributed_mode = true;\r
- break;\r
- case 'p':\r
- partitioned_mode = true;\r
- break;\r
- case 'L':\r
- use_live_hosts_file = true;\r
- break;\r
- case 'C':\r
- if(optarg != NULL)\r
- config_dir_path = string(optarg) + string("/");\r
- break;\r
- case 'l':\r
- if(optarg != NULL)\r
- library_path = string(optarg) + string("/");\r
- break;\r
- case 'N':\r
- output_query_names = true;\r
- break;\r
- case 'Q':\r
- numeric_hfta_flname = false;\r
- break;\r
- case 'H':\r
- if(schema_file_name == ""){\r
- hfta_only = true;\r
- }\r
- break;\r
- case 'f':\r
- output_schema_summary=true;\r
- break;\r
- case 'M':\r
- create_makefile=true;\r
- break;\r
- case 'S':\r
- generate_stats=true;\r
- break;\r
- case 'P':\r
- use_pads = true;\r
- break;\r
- case 'c':\r
- clean_make = true;\r
- break;\r
- case 'h':\r
- if(optarg != NULL)\r
- hostname = optarg;\r
- break;\r
- case 'R':\r
- if(optarg != NULL)\r
- root_path = optarg;\r
- break;\r
- case 'n':\r
- if(optarg != NULL){\r
- n_virtual_interfaces = atoi(optarg);\r
- if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){\r
- fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);\r
- n_virtual_interfaces = 1;\r
- }\r
- }\r
- break;\r
- case '?':\r
- fprintf(stderr,"Error, argument %c not recognized.\n",optopt);\r
- fprintf(stderr,"%s\n", usage_str);\r
- exit(1);\r
- default:\r
- fprintf(stderr, "Argument was %c\n", optopt);\r
- fprintf(stderr,"Invalid arguments\n");\r
- fprintf(stderr,"%s\n", usage_str);\r
- exit(1);\r
- }\r
- }\r
- argc -= optind;\r
- argv += optind;\r
- for (int i = 0; i < argc; ++i) {\r
- if((schema_file_name == "") && !hfta_only){\r
- schema_file_name = argv[i];\r
- }else{\r
- input_file_names.push_back(argv[i]);\r
- }\r
- }\r
-\r
- if(input_file_names.size() == 0){\r
- fprintf(stderr,"%s\n", usage_str);\r
- exit(1);\r
- }\r
-\r
- if(clean_make){\r
- string clean_cmd = "rm Makefile hfta_*.cc";\r
- int clean_ret = system(clean_cmd.c_str());\r
- if(clean_ret){\r
- fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);\r
- }\r
- }\r
-\r
-\r
- nic_prop_db *npdb = new nic_prop_db(config_dir_path);\r
-\r
-// Open globally used file names.\r
-\r
- // prepend config directory to schema file\r
- schema_file_name = config_dir_path + schema_file_name;\r
- external_fcns_path = config_dir_path + string("external_fcns.def");\r
- string ifx_fname = config_dir_path + string("ifres.xml");\r
-\r
-// Find interface query file(s).\r
- if(hostname == ""){\r
- gethostname(tmpstr,TMPSTRLEN);\r
- hostname = tmpstr;\r
- }\r
- hostname_len = strlen(tmpstr);\r
- string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");\r
- vector<string> ifq_fls;\r
-\r
- ifq_fls.push_back(ifq_fname);\r
-\r
-\r
-// Get the field list, if it exists\r
- string flist_fl = config_dir_path + "field_list.xml";\r
- FILE *flf_in = NULL;\r
- if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {\r
- fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());\r
- xml_leaves = new xml_t();\r
- xmlParser_setfileinput(flf_in);\r
- if(xmlParserparse()){\r
- fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());\r
- }else{\r
- field_verifier = new field_list(xml_leaves);\r
- }\r
- }\r
-\r
- if(!hfta_only){\r
- if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {\r
- fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));\r
- exit(1);\r
- }\r
- }\r
-\r
-/*\r
- if(!(debug_only || hfta_only)){\r
- if((lfta_out = fopen("lfta.c","w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
- exit(1);\r
- }\r
- }\r
-*/\r
-\r
-// Get the output specification file.\r
-// format is\r
-// query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions\r
- string ospec_fl = "output_spec.cfg";\r
- FILE *osp_in = NULL;\r
- vector<ospec_str *> output_specs;\r
- multimap<string, int> qname_to_ospec;\r
- if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {\r
- char *flds[MAXFLDS];\r
- int o_lineno = 0;\r
- while(fgets(tmpstr,TMPSTRLEN,osp_in)){\r
- o_lineno++;\r
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
- if(nflds == 7){\r
-// make operator type lowercase\r
- char *tmpc;\r
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)\r
- *tmpc = tolower(*tmpc);\r
-\r
- ospec_str *tmp_ospec = new ospec_str();\r
- tmp_ospec->query = flds[0];\r
- tmp_ospec->operator_type = flds[1];\r
- tmp_ospec->operator_param = flds[2];\r
- tmp_ospec->output_directory = flds[3];\r
- tmp_ospec->bucketwidth = atoi(flds[4]);\r
- tmp_ospec->partitioning_flds = flds[5];\r
- tmp_ospec->n_partitions = atoi(flds[6]);\r
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));\r
- output_specs.push_back(tmp_ospec);\r
- }else{\r
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);\r
- }\r
- }\r
- fclose(osp_in);\r
- }else{\r
- fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");\r
- exit(1);\r
- }\r
-\r
-// hfta parallelism\r
- string pspec_fl = "hfta_parallelism.cfg";\r
- FILE *psp_in = NULL;\r
- map<string, int> hfta_parallelism;\r
- if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){\r
- char *flds[MAXFLDS];\r
- int o_lineno = 0;\r
- while(fgets(tmpstr,TMPSTRLEN,psp_in)){\r
- bool good_entry = true;\r
- o_lineno++;\r
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
- if(nflds == 2){\r
- string hname = flds[0];\r
- int par = atoi(flds[1]);\r
- if(par <= 0 || par > n_virtual_interfaces){\r
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
- good_entry = false;\r
- }\r
- if(good_entry && n_virtual_interfaces % par != 0){\r
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
- good_entry = false;\r
- }\r
- if(good_entry)\r
- hfta_parallelism[hname] = par;\r
- }\r
- }\r
- }else{\r
- fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());\r
- }\r
-\r
-\r
-// LFTA hash table sizes\r
- string htspec_fl = "lfta_htsize.cfg";\r
- FILE *htsp_in = NULL;\r
- map<string, int> lfta_htsize;\r
- if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){\r
- char *flds[MAXFLDS];\r
- int o_lineno = 0;\r
- while(fgets(tmpstr,TMPSTRLEN,htsp_in)){\r
- bool good_entry = true;\r
- o_lineno++;\r
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
- if(nflds == 2){\r
- string lfta_name = flds[0];\r
- int htsz = atoi(flds[1]);\r
- if(htsz>0){\r
- lfta_htsize[lfta_name] = htsz;\r
- }else{\r
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);\r
- }\r
- }\r
- }\r
- }else{\r
- fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());\r
- }\r
-\r
-// LFTA vitual interface hash split\r
- string rtlspec_fl = "rts_load.cfg";\r
- FILE *rtl_in = NULL;\r
- map<string, vector<int> > rts_hload;\r
- if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){\r
- char *flds[MAXFLDS];\r
- int r_lineno = 0;\r
- string iface_name;\r
- vector<int> hload;\r
- while(fgets(tmpstr,TMPSTRLEN,rtl_in)){\r
- bool good_entry = true;\r
- r_lineno++;\r
- iface_name = "";\r
- hload.clear();\r
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
- if(nflds >1){\r
- iface_name = flds[0];\r
- int cumm_h = 0;\r
- int j;\r
- for(j=1;j<nflds;++j){\r
- int h = atoi(flds[j]);\r
- if(h<=0)\r
- good_entry = false;\r
- cumm_h += h;\r
- hload.push_back(cumm_h);\r
- }\r
- }else{\r
- good_entry = false;\r
- }\r
- if(good_entry){\r
- rts_hload[iface_name] = hload;\r
- }else{\r
- fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());\r
- }\r
- }\r
- }\r
-\r
-\r
-\r
- if(output_query_names){\r
- if((query_name_output = fopen("query_names.txt","w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));\r
- exit(1);\r
- }\r
- }\r
-\r
- if(output_schema_summary){\r
- if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));\r
- exit(1);\r
- }\r
- }\r
-\r
- if((qtree_output = fopen("qtree.xml","w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));\r
- exit(1);\r
- }\r
- fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");\r
- fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");\r
- fprintf(qtree_output,"<QueryNodes>\n");\r
-\r
-\r
-// Get an initial Schema\r
- table_list *Schema;\r
- if(!hfta_only){\r
-// Parse the table schema definitions.\r
- fta_parse_result = new fta_parse_t();\r
- FtaParser_setfileinput(table_schemas_in);\r
- if(FtaParserparse()){\r
- fprintf(stderr,"Table schema parse failed.\n");\r
- exit(1);\r
- }\r
- if(fta_parse_result->parse_type != TABLE_PARSE){\r
- fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());\r
- exit(1);\r
- }\r
- Schema = fta_parse_result->tables;\r
-\r
-// Process schema field inheritance\r
- int retval;\r
- retval = Schema->unroll_tables(err_str);\r
- if(retval){\r
- fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );\r
- exit(1);\r
- }\r
- }else{\r
-// hfta only => we will try to fetch schemas from the registry.\r
-// therefore, start off with an empty schema.\r
- Schema = new table_list();\r
- }\r
-\r
-\r
-// Open and parse the external functions file.\r
- Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");\r
- if(Ext_fcnsParserin == NULL){\r
- fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");\r
- Ext_fcns = new ext_fcn_list();\r
- }else{\r
- if(Ext_fcnsParserparse()){\r
- fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");\r
- Ext_fcns = new ext_fcn_list();\r
- }\r
- }\r
- if(Ext_fcns->validate_fcns(err_str)){\r
- fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());\r
- exit(1);\r
- }\r
-\r
-// Open and parse the interface resources file.\r
-// ifq_t *ifaces_db = new ifq_t();\r
-// string ierr;\r
-// if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
-// fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
-// ifx_fname.c_str(), ierr.c_str());\r
-// exit(1);\r
-// }\r
-// if(ifaces_db->load_ifqs(ifq_fname, ierr)){\r
-// fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
-// ifq_fname.c_str(), ierr.c_str());\r
-// exit(1);\r
-// }\r
-\r
-\r
-// The LFTA code string.\r
-// Put the standard preamble here.\r
-// NOTE: the hash macros, fcns should go into the run time\r
- map<string, string> lfta_val;\r
- map<string, string> lfta_prefilter_val;\r
-\r
- string lfta_header =\r
-"#include <limits.h>\n\n"\r
-"#include \"rts.h\"\n"\r
-"#include \"fta.h\"\n"\r
-"#include \"lapp.h\"\n"\r
-"#include \"rts_udaf.h\"\n\n"\r
-;\r
-// Get any locally defined parsing headers\r
- glob_t glob_result;\r
- memset(&glob_result, 0, sizeof(glob_result));\r
-\r
- // do the glob operation\r
- int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);\r
- if(return_value == 0){\r
- for(size_t i = 0; i < glob_result.gl_pathc; ++i) {\r
- char *flds[1000];\r
- int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);\r
- lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";\r
- }\r
- }else{\r
- fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");\r
- }\r
-\r
-/*\r
-"#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"\r
-"#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"\r
-"#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"\r
-"#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"\r
-*/\r
-\r
- lfta_header += \r
-"\n"\r
-"gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"\r
-"\n"\r
-"#define SLOT_FILLED 0x04\n"\r
-"#define SLOT_GEN_BITS 0x03\n"\r
-"#define SLOT_HASH_BITS 0xfffffff8\n"\r
-"#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"\r
-"#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"\r
-"#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"\r
-"\n\n"\r
-\r
-"#define lfta_BOOL_to_hash(x) (x)\n"\r
-"#define lfta_USHORT_to_hash(x) (x)\n"\r
-"#define lfta_UINT_to_hash(x) (x)\n"\r
-"#define lfta_IP_to_hash(x) (x)\n"\r
-"#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"\r
-"#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"\r
-"#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
-"#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
-"#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"\r
-"static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"\r
-" gs_uint32_t i,ret=0,tmp_sum = 0;\n"\r
-" for(i=0;i<x.length;++i){\n"\r
-" tmp_sum |= (x.data[i]) << (8*(i%4));\n"\r
-" if((i%4) == 3){\n"\r
-" ret ^= tmp_sum;\n"\r
-" tmp_sum = 0;\n"\r
-" }\n"\r
-" }\n"\r
-" if((i%4)!=0) ret ^=tmp_sum;\n"\r
-" return(ret);\n"\r
-"}\n\n\n";\r
-\r
-\r
-\r
-//////////////////////////////////////////////////////////////////\r
-///// Get all of the query parse trees\r
-\r
-\r
- int i,p;\r
- int hfta_count = 0; // for numeric suffixes to hfta .cc files\r
-\r
-//---------------------------\r
-// Global info needed for post processing.\r
-\r
-// Set of operator views ref'd in the query set.\r
- opview_set opviews;\r
-// lftas on a per-machine basis.\r
- map<string, vector<stream_query *> > lfta_mach_lists;\r
- int nfiles = input_file_names.size();\r
- vector<stream_query *> hfta_list; // list of hftas.\r
- map<string, stream_query *> sq_map; // map from query name to stream query.\r
-\r
-\r
-//////////////////////////////////////////\r
-\r
-// Open and parse the interface resources file.\r
- ifq_t *ifaces_db = new ifq_t();\r
- string ierr;\r
- if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
- fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
- ifx_fname.c_str(), ierr.c_str());\r
- exit(1);\r
- }\r
- if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){\r
- fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
- ifq_fls[0].c_str(), ierr.c_str());\r
- exit(1);\r
- }\r
-\r
- map<string, string> qname_to_flname; // for detecting duplicate query names\r
-\r
-\r
-\r
-// Parse the files to create a vector of parse trees.\r
-// Load qnodes with information to perform a topo sort\r
-// based on query dependencies.\r
- vector<query_node *> qnodes; // for topo sort.\r
- map<string,int> name_node_map; // map query name to qnodes entry\r
- for(i=0;i<input_file_names.size();i++){\r
-\r
- if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {\r
- fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));\r
- continue;\r
- }\r
-fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());\r
-\r
-// Parse the FTA query\r
- fta_parse_result = new fta_parse_t();\r
- FtaParser_setfileinput(fta_in);\r
- if(FtaParserparse()){\r
- fprintf(stderr,"FTA parse failed.\n");\r
- exit(1);\r
- }\r
- if(fta_parse_result->parse_type != QUERY_PARSE){\r
- fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
- exit(1);\r
- }\r
-\r
-// returns a list of parse trees\r
- vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
- for(p=0;p<qlist.size();++p){\r
- table_exp_t *fta_parse_tree = qlist[p];\r
-// query_parse_trees.push_back(fta_parse_tree);\r
-\r
-// compute the default name -- extract from query name\r
- strcpy(tmpstr,input_file_names[i].c_str());\r
- char *qname = strrchr(tmpstr,PATH_DELIM);\r
- if(qname == NULL)\r
- qname = tmpstr;\r
- else\r
- qname++;\r
- char *qname_end = strchr(qname,'.');\r
- if(qname_end != NULL) *qname_end = '\0';\r
- string qname_str = qname;\r
- string imputed_qname = impute_query_name(fta_parse_tree, qname_str);\r
-\r
-// Deternmine visibility. Should I be attaching all of the output methods?\r
- if(qname_to_ospec.count(imputed_qname)>0)\r
- fta_parse_tree->set_visible(true);\r
- else\r
- fta_parse_tree->set_visible(false);\r
-\r
-\r
-// Create a manipulable repesentation of the parse tree.\r
-// the qnode inherits the visibility assigned to the parse tree.\r
- int pos = qnodes.size();\r
- qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));\r
- name_node_map[ qnodes[pos]->name ] = pos;\r
-//printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);\r
-// qnames.push_back(impute_query_name(fta_parse_tree, qname_str));\r
-// qfiles.push_back(i);\r
-\r
-// Check for duplicate query names\r
-// NOTE : in hfta-only generation, I should\r
-// also check with the names of the registered queries.\r
- if(qname_to_flname.count(qnodes[pos]->name) > 0){\r
- fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",\r
- qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());\r
- exit(1);\r
- }\r
- if(Schema->find_tbl(qnodes[pos]->name) >= 0){\r
- fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",\r
- qnodes[pos]->name.c_str(), input_file_names[i].c_str());\r
- exit(1);\r
- }\r
- qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();\r
-\r
-\r
- }\r
- }\r
-\r
-// Add the library queries\r
-\r
- int pos;\r
- for(pos=0;pos<qnodes.size();++pos){\r
- int fi;\r
- for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){\r
- string src_tbl = qnodes[pos]->refd_tbls[fi];\r
- if(qname_to_flname.count(src_tbl) == 0){\r
- int last_sep = src_tbl.find_last_of('/');\r
- if(last_sep != string::npos){\r
-fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());\r
- string target_qname = src_tbl.substr(last_sep+1);\r
- string qpathname = library_path + src_tbl + ".gsql";\r
- if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {\r
- fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));\r
- exit(1);\r
- fprintf(stderr,"After exit\n");\r
- }\r
-fprintf(stderr,"Parsing file %s\n",qpathname.c_str());\r
-// Parse the FTA query\r
- fta_parse_result = new fta_parse_t();\r
- FtaParser_setfileinput(fta_in);\r
- if(FtaParserparse()){\r
- fprintf(stderr,"FTA parse failed.\n");\r
- exit(1);\r
- }\r
- if(fta_parse_result->parse_type != QUERY_PARSE){\r
- fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
- exit(1);\r
- }\r
-\r
- map<string, int> local_query_map;\r
- vector<string> local_query_names;\r
- vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
- for(p=0;p<qlist.size();++p){\r
- table_exp_t *fta_parse_tree = qlist[p];\r
- fta_parse_tree->set_visible(false); // assumed to not produce output\r
- string imputed_qname = impute_query_name(fta_parse_tree, target_qname);\r
- if(imputed_qname == target_qname)\r
- imputed_qname = src_tbl;\r
- if(local_query_map.count(imputed_qname)>0){\r
- fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());\r
- exit(1);\r
- }\r
- local_query_map[ imputed_qname ] = p;\r
- local_query_names.push_back(imputed_qname);\r
- }\r
-\r
- if(local_query_map.count(src_tbl)==0){\r
- fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());\r
- exit(1);\r
- }\r
-\r
- vector<int> worklist;\r
- set<int> added_queries;\r
- vector<query_node *> new_qnodes;\r
- worklist.push_back(local_query_map[target_qname]);\r
- added_queries.insert(local_query_map[target_qname]);\r
- int qq;\r
- int qpos = qnodes.size();\r
- for(qq=0;qq<worklist.size();++qq){\r
- int q_id = worklist[qq];\r
- query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );\r
- new_qnodes.push_back( new_qnode);\r
- vector<string> refd_tbls = new_qnode->refd_tbls;\r
- int ff;\r
- for(ff = 0;ff<refd_tbls.size();++ff){\r
- if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){\r
-\r
- if(name_node_map.count(refd_tbls[ff])>0){\r
- fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );\r
- exit(1);\r
- }else{\r
- worklist.push_back(local_query_map[refd_tbls[ff]]);\r
- }\r
- }\r
- }\r
- }\r
-\r
- for(qq=0;qq<new_qnodes.size();++qq){\r
- int qpos = qnodes.size();\r
- qnodes.push_back(new_qnodes[qq]);\r
- name_node_map[qnodes[qpos]->name ] = qpos;\r
- qname_to_flname[qnodes[qpos]->name ] = qpathname;\r
- }\r
- }\r
- }\r
- }\r
- }\r
-\r
-\r
-\r
-\r
-\r
-\r
-\r
-\r
-//---------------------------------------\r
-\r
-\r
-// Add the UDOPS.\r
-\r
- string udop_missing_sources;\r
- for(i=0;i<qnodes.size();++i){\r
- int fi;\r
- for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){\r
- int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);\r
- if(sid >= 0){\r
- if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){\r
- if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){\r
- int pos = qnodes.size();\r
- qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));\r
- name_node_map[ qnodes[pos]->name ] = pos;\r
- qnodes[pos]->is_externally_visible = false; // its visible\r
- // Need to mark the source queries as visible.\r
- int si;\r
- string missing_sources = "";\r
- for(si=0;si<qnodes[pos]->refd_tbls.size();++si){\r
- string src_tbl = qnodes[pos]->refd_tbls[si];\r
- if(name_node_map.count(src_tbl)==0){\r
- missing_sources += src_tbl + " ";\r
- }\r
- }\r
- if(missing_sources != ""){\r
- udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";\r
- }\r
- }\r
- }\r
- }\r
- }\r
- }\r
- if(udop_missing_sources != ""){\r
- fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());\r
- exit(1);\r
- }\r
-\r
-\r
-\r
-////////////////////////////////////////////////////////////////////\r
-/// Check parse trees to verify that some\r
-/// global properties are met :\r
-/// if q1 reads from q2, then\r
-/// q2 is processed before q1\r
-/// q1 can supply q2's parameters\r
-/// Verify there is no cycle in the reads-from graph.\r
-\r
-// Compute an order in which to process the\r
-// queries.\r
-\r
-// Start by building the reads-from lists.\r
-//\r
-\r
- for(i=0;i<qnodes.size();++i){\r
- int qi, fi;\r
- vector<string> refd_tbls = qnodes[i]->refd_tbls;\r
- for(fi = 0;fi<refd_tbls.size();++fi){\r
- if(name_node_map.count(refd_tbls[fi])>0){\r
-//printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);\r
- (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);\r
- }\r
- }\r
- }\r
-\r
-\r
-// If one query reads the result of another,\r
-// check for parameter compatibility. Currently it must\r
-// be an exact match. I will move to requiring\r
-// containment after re-ordering, but will require\r
-// some analysis for code generation which is not\r
-// yet in place.\r
-//printf("There are %d query nodes.\n",qnodes.size());\r
-\r
-\r
- for(i=0;i<qnodes.size();++i){\r
- vector<var_pair_t *> target_params = qnodes[i]->params;\r
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
- vector<var_pair_t *> source_params = qnodes[(*si)]->params;\r
- if(target_params.size() != source_params.size()){\r
- fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
- exit(1);\r
- }\r
- int p;\r
- for(p=0;p<target_params.size();++p){\r
- if(! (target_params[p]->name == source_params[p]->name &&\r
- target_params[p]->val == source_params[p]->val ) ){\r
- fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
- exit(1);\r
- }\r
- }\r
- }\r
- }\r
-\r
-\r
-// Find the roots.\r
-// Start by counting inedges.\r
- for(i=0;i<qnodes.size();++i){\r
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
- qnodes[(*si)]->n_consumers++;\r
- }\r
- }\r
-\r
-// The roots are the nodes with indegree zero.\r
- set<int> roots;\r
- for(i=0;i<qnodes.size();++i){\r
- if(qnodes[i]->n_consumers == 0){\r
- if(qnodes[i]->is_externally_visible == false){\r
- fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());\r
- }\r
- roots.insert(i);\r
- }\r
- }\r
-\r
-// Remove the parts of the subtree that produce no output.\r
- set<int> valid_roots;\r
- set<int> discarded_nodes;\r
- set<int> candidates;\r
- while(roots.size() >0){\r
- for(si=roots.begin();si!=roots.end();++si){\r
- if(qnodes[(*si)]->is_externally_visible){\r
- valid_roots.insert((*si));\r
- }else{\r
- discarded_nodes.insert((*si));\r
- set<int>::iterator sir;\r
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
- qnodes[(*sir)]->n_consumers--;\r
- if(qnodes[(*sir)]->n_consumers == 0)\r
- candidates.insert( (*sir));\r
- }\r
- }\r
- }\r
- roots = candidates;\r
- candidates.clear();\r
- }\r
- roots = valid_roots;\r
- if(discarded_nodes.size()>0){\r
- fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");\r
- int di = 0;\r
- for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){\r
- if(di>0 && (di%8)==0) fprintf(stderr,"\n");\r
- di++;\r
- fprintf(stderr," %s",qnodes[(*si)]->name.c_str());\r
- }\r
- fprintf(stderr,"\n");\r
- }\r
-\r
-// Compute the sources_to set, ignoring discarded nodes.\r
- for(i=0;i<qnodes.size();++i){\r
- if(discarded_nodes.count(i)==0)\r
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
- qnodes[(*si)]->sources_to.insert(i);\r
- }\r
- }\r
-\r
-\r
-// Find the nodes that are shared by multiple visible subtrees.\r
-// THe roots become inferred visible nodes.\r
-\r
-// Find the visible nodes.\r
- vector<int> visible_nodes;\r
- for(i=0;i<qnodes.size();i++){\r
- if(qnodes[i]->is_externally_visible){\r
- visible_nodes.push_back(i);\r
- }\r
- }\r
-\r
-// Find UDOPs referenced by visible nodes.\r
- list<int> workq;\r
- for(i=0;i<visible_nodes.size();++i){\r
- workq.push_back(visible_nodes[i]);\r
- }\r
- while(!workq.empty()){\r
- int node = workq.front();\r
- workq.pop_front();\r
- set<int>::iterator children;\r
- if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){\r
- qnodes[node]->is_externally_visible = true;\r
- visible_nodes.push_back(node);\r
- for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
- if(qnodes[(*children)]->is_externally_visible == false){\r
- qnodes[(*children)]->is_externally_visible = true;\r
- visible_nodes.push_back((*children));\r
- }\r
- }\r
- }\r
- for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
- workq.push_back((*children));\r
- }\r
- }\r
-\r
- bool done = false;\r
- while(!done){\r
-// reset the nodes\r
- for(i=0;i<qnodes.size();i++){\r
- qnodes[i]->subtree_roots.clear();\r
- }\r
-\r
-// Walk the tree defined by a visible node, not descending into\r
-// subtrees rooted by a visible node. Mark the node visited with\r
-// the visible node ID.\r
- for(i=0;i<visible_nodes.size();++i){\r
- set<int> vroots;\r
- vroots.insert(visible_nodes[i]);\r
- while(vroots.size()>0){\r
- for(si=vroots.begin();si!=vroots.end();++si){\r
- qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);\r
-\r
- set<int>::iterator sir;\r
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
- if(! qnodes[(*sir)]->is_externally_visible){\r
- candidates.insert( (*sir));\r
- }\r
- }\r
- }\r
- vroots = candidates;\r
- candidates.clear();\r
- }\r
- }\r
-// Find the nodes in multiple visible node subtrees, but with no parent\r
-// that has is in multile visible node subtrees. Mark these as inferred visible nodes.\r
- done = true; // until proven otherwise\r
- for(i=0;i<qnodes.size();i++){\r
- if(qnodes[i]->subtree_roots.size()>1){\r
- bool is_new_root = true;\r
- set<int>::iterator sir;\r
- for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){\r
- if(qnodes[(*sir)]->subtree_roots.size()>1)\r
- is_new_root = false;\r
- }\r
- if(is_new_root){\r
- qnodes[i]->is_externally_visible = true;\r
- qnodes[i]->inferred_visible_node = true;\r
- visible_nodes.push_back(i);\r
- done = false;\r
- }\r
- }\r
- }\r
- }\r
-\r
-\r
-\r
-\r
-\r
-// get visible nodes in topo ordering.\r
-// for(i=0;i<qnodes.size();i++){\r
-// qnodes[i]->n_consumers = qnodes[i]->sources_to.size();\r
-// }\r
- vector<int> process_order;\r
- while(roots.size() >0){\r
- for(si=roots.begin();si!=roots.end();++si){\r
- if(discarded_nodes.count((*si))==0){\r
- process_order.push_back( (*si) );\r
- }\r
- set<int>::iterator sir;\r
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
- qnodes[(*sir)]->n_consumers--;\r
- if(qnodes[(*sir)]->n_consumers == 0)\r
- candidates.insert( (*sir));\r
- }\r
- }\r
- roots = candidates;\r
- candidates.clear();\r
- }\r
-\r
-\r
-//printf("process_order.size() =%d\n",process_order.size());\r
-\r
-// Search for cyclic dependencies\r
- string found_dep;\r
- for(i=0;i<qnodes.size();++i){\r
- if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){\r
- if(found_dep.size() != 0) found_dep += ", ";\r
- found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";\r
- }\r
- }\r
- if(found_dep.size()>0){\r
- fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());\r
- exit(1);\r
- }\r
-\r
-// Get a list of query sets, in the order to be processed.\r
-// Start at visible root and do bfs.\r
-// The query set includes queries referenced indirectly,\r
-// as sources for user-defined operators. These are needed\r
-// to ensure that they are added to the schema, but are not part\r
-// of the query tree.\r
-\r
-// stream_node_sets contains queries reachable only through the\r
-// FROM clause, so I can tell which queries to add to the stream\r
-// query. (DISABLED, UDOPS are integrated, does this cause problems?)\r
-\r
-// NOTE: this code works because in order for data to be\r
-// read by multiple hftas, the node must be externally visible.\r
-// But visible nodes define roots of process sets.\r
-// internally visible nodes can feed data only\r
-// to other nodes in the same query file.\r
-// Therefore, any access can be restricted to a file,\r
-// hfta output sharing is done only on roots\r
-// never on interior nodes.\r
-\r
-\r
-\r
-\r
-// Conpute the base collection of hftas.\r
- vector<hfta_node *> hfta_sets;\r
- map<string, int> hfta_name_map;\r
-// vector< vector<int> > process_sets;\r
-// vector< set<int> > stream_node_sets;\r
- reverse(process_order.begin(), process_order.end()); // get listing in reverse order.\r
- // i.e. process leaves 1st.\r
- for(i=0;i<process_order.size();++i){\r
- if(qnodes[process_order[i]]->is_externally_visible == true){\r
-//printf("Visible.\n");\r
- int root = process_order[i];\r
- hfta_node *hnode = new hfta_node();\r
- hnode->name = qnodes[root]-> name;\r
- hnode->source_name = qnodes[root]-> name;\r
- hnode->is_udop = qnodes[root]->is_udop;\r
- hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;\r
-\r
- vector<int> proc_list; proc_list.push_back(root);\r
-// Ensure that nodes are added only once.\r
- set<int> proc_set; proc_set.insert(root);\r
- roots.clear(); roots.insert(root);\r
- candidates.clear();\r
- while(roots.size()>0){\r
- for(si=roots.begin();si!=roots.end();++si){\r
-//printf("Processing root %d\n",(*si));\r
- set<int>::iterator sir;\r
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
-//printf("reads fom %d\n",(*sir));\r
- if(qnodes[(*sir)]->is_externally_visible==false){\r
- candidates.insert( (*sir) );\r
- if(proc_set.count( (*sir) )==0){\r
- proc_set.insert( (*sir) );\r
- proc_list.push_back( (*sir) );\r
- }\r
- }\r
- }\r
- }\r
- roots = candidates;\r
- candidates.clear();\r
- }\r
-\r
- reverse(proc_list.begin(), proc_list.end());\r
- hnode->query_node_indices = proc_list;\r
- hfta_name_map[hnode->name] = hfta_sets.size();\r
- hfta_sets.push_back(hnode);\r
- }\r
- }\r
-\r
-// Compute the reads_from / sources_to graphs for the hftas.\r
-\r
- for(i=0;i<hfta_sets.size();++i){\r
- hfta_node *hnode = hfta_sets[i];\r
- for(q=0;q<hnode->query_node_indices.size();q++){\r
- query_node *qnode = qnodes[hnode->query_node_indices[q]];\r
- for(s=0;s<qnode->refd_tbls.size();++s){\r
- if(hfta_name_map.count(qnode->refd_tbls[s])){\r
- int other_hfta = hfta_name_map[qnode->refd_tbls[s]];\r
- hnode->reads_from.insert(other_hfta);\r
- hfta_sets[other_hfta]->sources_to.insert(i);\r
- }\r
- }\r
- }\r
- }\r
-\r
-// Compute a topological sort of the hfta_sets.\r
-\r
- vector<int> hfta_topsort;\r
- workq.clear();\r
- int hnode_srcs[hfta_sets.size()];\r
- for(i=0;i<hfta_sets.size();++i){\r
- hnode_srcs[i] = 0;\r
- if(hfta_sets[i]->sources_to.size() == 0)\r
- workq.push_back(i);\r
- }\r
-\r
- while(! workq.empty()){\r
- int node = workq.front();\r
- workq.pop_front();\r
- hfta_topsort.push_back(node);\r
- set<int>::iterator stsi;\r
- for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){\r
- int parent = (*stsi);\r
- hnode_srcs[parent]++;\r
- if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){\r
- workq.push_back(parent);\r
- }\r
- }\r
- }\r
-\r
-// Decorate hfta nodes with the level of parallelism given as input.\r
-\r
- map<string, int>::iterator msii;\r
- for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
- string hfta_name = (*msii).first;\r
- int par = (*msii).second;\r
- if(hfta_name_map.count(hfta_name) > 0){\r
- hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;\r
- }else{\r
- fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());\r
- }\r
- }\r
-\r
-// Propagate levels of parallelism: children should have a level of parallelism\r
-// as large as any of its parents. Adjust children upwards to compensate.\r
-// Start at parents and adjust children, auto-propagation will occur.\r
-\r
- for(i=hfta_sets.size()-1;i>=0;i--){\r
- set<int>::iterator stsi;\r
- for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){\r
- if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){\r
- hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;\r
- }\r
- }\r
- }\r
-\r
-// Before all the name mangling, check if therey are any output_spec.cfg\r
-// or hfta_parallelism.cfg entries that do not have a matching query.\r
-\r
- string dangling_ospecs = "";\r
- for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){\r
- string oq = (*msii).first;\r
- if(hfta_name_map.count(oq) == 0){\r
- dangling_ospecs += " "+(*msii).first;\r
- }\r
- }\r
- if(dangling_ospecs!=""){\r
- fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());\r
- }\r
-\r
- string dangling_par = "";\r
- for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
- string oq = (*msii).first;\r
- if(hfta_name_map.count(oq) == 0){\r
- dangling_par += " "+(*msii).first;\r
- }\r
- }\r
- if(dangling_par!=""){\r
- fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());\r
- }\r
-\r
-\r
-\r
-// Replicate parallelized hftas. Do __copyX name mangling. Adjust\r
-// FROM clauses: retarget any name which is an internal node, and\r
-// any which is in hfta_sets (and which is parallelized). Add Merge nodes\r
-// when the source hfta has more parallelism than the target node.\r
-// Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.\r
-\r
-\r
- int n_original_hfta_sets = hfta_sets.size();\r
- for(i=0;i<n_original_hfta_sets;++i){\r
- if(hfta_sets[i]->n_parallel > 1){\r
- hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.\r
- set<string> local_nodes; // names of query nodes in the hfta.\r
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
- local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);\r
- }\r
-\r
- for(p=0;p<hfta_sets[i]->n_parallel;++p){\r
- string mangler = "__copy"+int_to_string(p);\r
- hfta_node *par_hfta = new hfta_node();\r
- par_hfta->name = hfta_sets[i]->name + mangler;\r
- par_hfta->source_name = hfta_sets[i]->name;\r
- par_hfta->is_udop = hfta_sets[i]->is_udop;\r
- par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;\r
- par_hfta->n_parallel = hfta_sets[i]->n_parallel;\r
- par_hfta->parallel_idx = p;\r
-\r
- map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.\r
-\r
-// Is it a UDOP?\r
- if(hfta_sets[i]->is_udop){\r
- int root = hfta_sets[i]->query_node_indices[0];\r
-\r
- string unequal_par_sources;\r
- set<int>::iterator rfsii;\r
- for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){\r
- if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){\r
- unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";\r
- }\r
- }\r
- if(unequal_par_sources != ""){\r
- fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());\r
- exit(1);\r
- }\r
-\r
- int rti;\r
- vector<string> new_sources;\r
- for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){\r
- new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);\r
- }\r
-\r
- query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);\r
- new_qn->name += mangler;\r
- new_qn->mangler = mangler;\r
- new_qn->refd_tbls = new_sources;\r
- par_hfta->query_node_indices.push_back(qnodes.size());\r
- par_qnode_map[new_qn->name] = qnodes.size();\r
- name_node_map[ new_qn->name ] = qnodes.size();\r
- qnodes.push_back(new_qn);\r
- }else{\r
-// regular query node\r
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
- int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
- table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);\r
-// rehome the from clause on mangled names.\r
-// create merge nodes as needed for external sources.\r
- for(f=0;f<dup_pt->fm->tlist.size();++f){\r
- if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
- dup_pt->fm->tlist[f]->schema_name += mangler;\r
- }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
-// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.\r
- int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];\r
- if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){\r
- dup_pt->fm->tlist[f]->schema_name += mangler;\r
- }else{\r
- vector<string> src_tbls;\r
- int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;\r
- if(stride == 0){\r
- fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());\r
- exit(1);\r
- }\r
- for(s=0;s<stride;++s){\r
- string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);\r
- src_tbls.push_back(ext_src_name);\r
- }\r
- table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
- string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);\r
- dup_pt->fm->tlist[f]->schema_name = merge_node_name;\r
-// Make a qnode to represent the new merge node\r
- query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
- qn_pt->refd_tbls = src_tbls;\r
- qn_pt->is_udop = false;\r
- qn_pt->is_externally_visible = false;\r
- qn_pt->inferred_visible_node = false;\r
- par_hfta->query_node_indices.push_back(qnodes.size());\r
- par_qnode_map[merge_node_name] = qnodes.size();\r
- name_node_map[ merge_node_name ] = qnodes.size();\r
- qnodes.push_back(qn_pt);\r
- }\r
- }\r
- }\r
- query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);\r
- for(f=0;f<dup_pt->fm->tlist.size();++f){\r
- new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);\r
- }\r
- new_qn->params = qnodes[hqn_idx]->params;\r
- new_qn->is_udop = false;\r
- new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;\r
- new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;\r
- par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());\r
- par_qnode_map[new_qn->name] = qnodes.size();\r
- name_node_map[ new_qn->name ] = qnodes.size();\r
- qnodes.push_back(new_qn);\r
- }\r
- }\r
- hfta_name_map[par_hfta->name] = hfta_sets.size();\r
- hfta_sets.push_back(par_hfta);\r
- }\r
- }else{\r
-// This hfta isn't being parallelized, but add merge nodes for any parallelized\r
-// hfta sources.\r
- if(!hfta_sets[i]->is_udop){\r
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
- int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
- for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){\r
- if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){\r
-// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.\r
- int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];\r
- if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){\r
- vector<string> src_tbls;\r
- for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){\r
- string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);\r
- src_tbls.push_back(ext_src_name);\r
- }\r
- table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
- string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);\r
- qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;\r
-// Make a qnode to represent the new merge node\r
- query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
- qn_pt->refd_tbls = src_tbls;\r
- qn_pt->is_udop = false;\r
- qn_pt->is_externally_visible = false;\r
- qn_pt->inferred_visible_node = false;\r
- hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());\r
- name_node_map[ merge_node_name ] = qnodes.size();\r
- qnodes.push_back(qn_pt);\r
- }\r
- }\r
- }\r
- }\r
- }\r
- }\r
- }\r
-\r
-// Rebuild the reads_from / sources_to lists in the qnodes\r
- for(q=0;q<qnodes.size();++q){\r
- qnodes[q]->reads_from.clear();\r
- qnodes[q]->sources_to.clear();\r
- }\r
- for(q=0;q<qnodes.size();++q){\r
- for(s=0;s<qnodes[q]->refd_tbls.size();++s){\r
- if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){\r
- int rf = name_node_map[qnodes[q]->refd_tbls[s]];\r
- qnodes[q]->reads_from.insert(rf);\r
- qnodes[rf]->sources_to.insert(q);\r
- }\r
- }\r
- }\r
-\r
-// Rebuild the reads_from / sources_to lists in hfta_sets\r
- for(q=0;q<hfta_sets.size();++q){\r
- hfta_sets[q]->reads_from.clear();\r
- hfta_sets[q]->sources_to.clear();\r
- }\r
- for(q=0;q<hfta_sets.size();++q){\r
- for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){\r
- int node = hfta_sets[q]->query_node_indices[s];\r
- set<int>::iterator rfsii;\r
- for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){\r
- if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){\r
- hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);\r
- hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);\r
- }\r
- }\r
- }\r
- }\r
-\r
-/*\r
-for(q=0;q<qnodes.size();++q){\r
- printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());\r
- set<int>::iterator rsii;\r
- for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)\r
- printf(" %d",(*rsii));\r
- printf(", and sources-to %d:",qnodes[q]->sources_to.size());\r
- for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)\r
- printf(" %d",(*rsii));\r
- printf("\n");\r
-}\r
-\r
-for(q=0;q<hfta_sets.size();++q){\r
- if(hfta_sets[q]->do_generation==false)\r
- continue;\r
- printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());\r
- set<int>::iterator rsii;\r
- for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)\r
- printf(" %d",(*rsii));\r
- printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());\r
- for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)\r
- printf(" %d",(*rsii));\r
- printf("\n");\r
-}\r
-*/\r
-\r
-\r
-\r
-// Re-topo sort the hftas\r
- hfta_topsort.clear();\r
- workq.clear();\r
- int hnode_srcs_2[hfta_sets.size()];\r
- for(i=0;i<hfta_sets.size();++i){\r
- hnode_srcs_2[i] = 0;\r
- if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){\r
- workq.push_back(i);\r
- }\r
- }\r
-\r
- while(workq.empty() == false){\r
- int node = workq.front();\r
- workq.pop_front();\r
- hfta_topsort.push_back(node);\r
- set<int>::iterator stsii;\r
- for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){\r
- int child = (*stsii);\r
- hnode_srcs_2[child]++;\r
- if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){\r
- workq.push_back(child);\r
- }\r
- }\r
- }\r
-\r
-// Ensure that all of the query_node_indices in hfta_sets are topologically\r
-// sorted, don't rely on assumptions that all transforms maintain some kind of order.\r
- for(i=0;i<hfta_sets.size();++i){\r
- if(hfta_sets[i]->do_generation){\r
- map<int,int> n_accounted;\r
- vector<int> new_order;\r
- workq.clear();\r
- vector<int>::iterator vii;\r
- for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
- n_accounted[(*vii)]= 0;\r
- }\r
- for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
- set<int>::iterator rfsii;\r
- for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){\r
- if(n_accounted.count((*rfsii)) == 0){\r
- n_accounted[(*vii)]++;\r
- }\r
- }\r
- if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){\r
- workq.push_back((*vii));\r
- }\r
- }\r
-\r
- while(workq.empty() == false){\r
- int node = workq.front();\r
- workq.pop_front();\r
- new_order.push_back(node);\r
- set<int>::iterator stsii;\r
- for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){\r
- if(n_accounted.count((*stsii))){\r
- n_accounted[(*stsii)]++;\r
- if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){\r
- workq.push_back((*stsii));\r
- }\r
- }\r
- }\r
- }\r
- hfta_sets[i]->query_node_indices = new_order;\r
- }\r
- }\r
-\r
-\r
-\r
-\r
-\r
-/// Global checkng is done, start the analysis and translation\r
-/// of the query parse tree in the order specified by process_order\r
-\r
-\r
-// Get a list of the LFTAs for global lfta optimization\r
-// TODO: separate building operators from spliting lftas,\r
-// that will make optimizations such as predicate pushing easier.\r
- vector<stream_query *> lfta_list;\r
-\r
- stream_query *rootq;\r
-\r
- int qi,qj;\r
-\r
- for(qi=hfta_topsort.size()-1;qi>=0;--qi){\r
-\r
- int hfta_id = hfta_topsort[qi];\r
- vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;\r
-\r
-\r
-\r
-// Two possibilities, either its a UDOP, or its a collection of queries.\r
-// if(qnodes[curr_list.back()]->is_udop)\r
- if(hfta_sets[hfta_id]->is_udop){\r
- int node_id = curr_list.back();\r
- int udop_schref = Schema->find_tbl(qnodes[node_id]->file);\r
- opview_entry *opv = new opview_entry();\r
-\r
-// Many of the UDOP properties aren't currently used.\r
- opv->parent_qname = "no_parent";\r
- opv->root_name = qnodes[node_id]->name;\r
- opv->view_name = qnodes[node_id]->file;\r
- opv->pos = qi;\r
- sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());\r
- opv->udop_alias = tmpstr;\r
- opv->mangler = qnodes[node_id]->mangler;\r
-\r
- if(opv->mangler != ""){\r
- int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);\r
- Schema->mangle_subq_names(new_udop_schref,opv->mangler);\r
- }\r
-\r
-// This piece of code makes each hfta which referes to the same udop\r
-// reference a distinct running udop. Do this at query optimization time?\r
-// fmtbl->set_udop_alias(opv->udop_alias);\r
-\r
- opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));\r
- opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());\r
-\r
- vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);\r
- int s,f,q;\r
- for(s=0;s<subq.size();++s){\r
-// Validate that the fields match.\r
- subquery_spec *sqs = subq[s];\r
- string subq_name = sqs->name + opv->mangler;\r
- vector<field_entry *> flds = Schema->get_fields(subq_name);\r
- if(flds.size() == 0){\r
- fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());\r
- return(1);\r
- }\r
- if(flds.size() < sqs->types.size()){\r
- fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());\r
- return(1);\r
- }\r
- bool failed = false;\r
- for(f=0;f<sqs->types.size();++f){\r
- data_type dte(sqs->types[f],sqs->modifiers[f]);\r
- data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());\r
- if(! dte.subsumes_type(&dtf) ){\r
- fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());\r
- failed = true;\r
- }\r
-/*\r
- if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){\r
- string pstr = dte.get_temporal_string();\r
- fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);\r
- failed = true;\r
- }\r
-*/\r
- }\r
- if(failed)\r
- return(1);\r
-/// Validation done, find the subquery, make a copy of the\r
-/// parse tree, and add it to the return list.\r
- for(q=0;q<qnodes.size();++q)\r
- if(qnodes[q]->name == subq_name)\r
- break;\r
- if(q==qnodes.size()){\r
- fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());\r
- return(1);\r
- }\r
-\r
- }\r
-\r
-// Cross-link to from entry(s) in all sourced-to tables.\r
- set<int>::iterator sii;\r
- for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){\r
-//printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());\r
- vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();\r
- int ii;\r
- for(ii=0;ii<tblvars.size();++ii){\r
- if(tblvars[ii]->schema_name == opv->root_name){\r
- tblvars[ii]->set_opview_idx(opviews.size());\r
- }\r
-\r
- }\r
- }\r
-\r
- opviews.append(opv);\r
- }else{\r
-\r
-// Analyze the parse trees in this query,\r
-// put them in rootq\r
-// vector<int> curr_list = process_sets[qi];\r
-\r
-\r
-////////////////////////////////////////\r
-\r
- rootq = NULL;\r
-//printf("Process set %d, has %d queries\n",qi,curr_list.size());\r
- for(qj=0;qj<curr_list.size();++qj){\r
- i = curr_list[qj];\r
- fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);\r
-\r
-// Select the current query parse tree\r
- table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;\r
-\r
-// if hfta only, try to fetch any missing schemas\r
-// from the registry (using the print_schema program).\r
-// Here I use a hack to avoid analyzing the query -- all referenced\r
-// tables must be in the from clause\r
-// If there is a problem loading any table, just issue a warning,\r
-//\r
- tablevar_list_t *fm = fta_parse_tree->get_from();\r
- vector<string> refd_tbls = fm->get_src_tbls(Schema);\r
-// iterate over all referenced tables\r
- int t;\r
- for(t=0;t<refd_tbls.size();++t){\r
- int tbl_ref = Schema->get_table_ref(refd_tbls[t]);\r
-\r
- if(tbl_ref < 0){ // if this table is not in the Schema\r
-\r
- if(hfta_only){\r
- string cmd="print_schema "+refd_tbls[t];\r
- FILE *schema_in = popen(cmd.c_str(), "r");\r
- if(schema_in == NULL){\r
- fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());\r
- }else{\r
- string schema_instr;\r
- while(fgets(tmpstr,TMPSTRLEN,schema_in)){\r
- schema_instr += tmpstr;\r
- }\r
- fta_parse_result = new fta_parse_t();\r
- strcpy(tmp_schema_str,schema_instr.c_str());\r
- FtaParser_setstringinput(tmp_schema_str);\r
- if(FtaParserparse()){\r
- fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());\r
- }else{\r
- if( fta_parse_result->tables != NULL){\r
- int tl;\r
- for(tl=0;tl<fta_parse_result->tables->size();++tl){\r
- Schema->add_table(fta_parse_result->tables->get_table(tl));\r
- }\r
- }else{\r
- fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());\r
- }\r
- }\r
- }\r
- }else{\r
- fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());\r
- exit(1);\r
- }\r
-\r
- }\r
- }\r
-\r
-\r
-// Analyze the query.\r
- query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);\r
- if(qs == NULL){\r
- fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());\r
- exit(1);\r
- }\r
-\r
- stream_query new_sq(qs, Schema);\r
- if(new_sq.error_code){\r
- fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());\r
- exit(1);\r
- }\r
-\r
-// Add it to the Schema\r
- table_def *output_td = new_sq.get_output_tabledef();\r
- Schema->add_table(output_td);\r
-\r
-// Create a query plan from the analyzed parse tree.\r
-// If its a query referneced via FROM, add it to the stream query.\r
- if(rootq){\r
- rootq->add_query(new_sq);\r
- }else{\r
- rootq = new stream_query(new_sq);\r
-// have the stream query object inherit properties form the analyzed\r
-// hfta_node object.\r
- rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);\r
- rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();\r
- }\r
-\r
-\r
- }\r
-\r
-// This stream query has all its parts\r
-// Build and optimize it.\r
-//printf("translate_fta: generating plan.\n");\r
- if(rootq->generate_plan(Schema)){\r
- fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());\r
- continue;\r
- }\r
-\r
-// If we've found the query plan head, so now add the output operators\r
- if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){\r
- pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;\r
- multimap<string, int>::iterator mmsi;\r
- oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);\r
- for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){\r
- rootq->add_output_operator(output_specs[(*mmsi).second]);\r
- }\r
- }\r
-\r
-\r
-\r
-// Perform query splitting if necessary.\r
- bool hfta_returned;\r
- vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);\r
-\r
- int l;\r
-//for(l=0;l<split_queries.size();++l){\r
-//printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());\r
-//}\r
-\r
-\r
-\r
-\r
- if(split_queries.size() > 0){ // should be at least one component.\r
-\r
-// Compute the number of LFTAs.\r
- int n_lfta = split_queries.size();\r
- if(hfta_returned) n_lfta--;\r
-\r
-\r
-// Process the LFTA components.\r
- for(l=0;l<n_lfta;++l){\r
- if(lfta_names.count(split_queries[l]->query_name) == 0){\r
-// Grab the lfta for global optimization.\r
- vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();\r
- string liface = tvec[0]->get_interface();\r
- string lmach = tvec[0]->get_machine();\r
- if (lmach == "")\r
- lmach = hostname;\r
- interface_names.push_back(liface);\r
- machine_names.push_back(lmach);\r
-//printf("Machine is %s\n",lmach.c_str());\r
-\r
-// Set the ht size from the recommendation, if there is one in the rec file\r
- if(lfta_htsize.count(split_queries[l]->query_name)>0){\r
- split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));\r
- }\r
-\r
-\r
- lfta_names[split_queries[l]->query_name] = lfta_list.size();\r
- split_queries[l]->set_gid(lfta_list.size()); // set lfta global id\r
- lfta_list.push_back(split_queries[l]);\r
- lfta_mach_lists[lmach].push_back(split_queries[l]);\r
-\r
-// THe following is a hack,\r
-// as I should be generating LFTA code through\r
-// the stream_query object.\r
- split_queries[l]->query_plan[0]->bind_to_schema(Schema);\r
-// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;\r
-\r
-/*\r
-// Create query description to embed in lfta.c\r
- string lfta_schema_str = split_queries[l]->make_schema();\r
- string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
-\r
-// get NIC capabilities.\r
- int erri;\r
- nic_property *nicprop = NULL;\r
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
- if(iface_codegen_type.size()){\r
- nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
- if(!nicprop){\r
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
- exit(1);\r
- }\r
- }\r
-\r
- lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);\r
-*/\r
-\r
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));\r
- query_names.push_back(split_queries[l]->query_name);\r
- mach_query_names[lmach].push_back(query_names.size()-1);\r
-// NOTE: I will assume a 1-1 correspondance between\r
-// mach_query_names[lmach] and lfta_mach_lists[lmach]\r
-// where mach_query_names[lmach][i] contains the index into\r
-// query_names, which names the lfta, and\r
-// mach_query_names[lmach][i] is the stream_query * of the\r
-// corresponding lfta.\r
-// Later, lfta_iface_qnames are the query names matching lfta_iface_lists\r
-\r
-\r
-\r
- // check if lfta is reusable\r
- // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters\r
-\r
- bool lfta_reusable = false;\r
- if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||\r
- split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {\r
- lfta_reusable = true;\r
- }\r
- lfta_reuse_options.push_back(lfta_reusable);\r
-\r
- // LFTA will inherit the liveness timeout specification from the containing query\r
- // it is too conservative as lfta are expected to spend less time per tuple\r
- // then full query\r
-\r
- // extract liveness timeout from query definition\r
- int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());\r
- if (!liveness_timeout) {\r
-// fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",\r
-// split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
- liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
- }\r
- lfta_liveness_timeouts.push_back(liveness_timeout);\r
-\r
-// Add it to the schema\r
- table_def *td = split_queries[l]->get_output_tabledef();\r
- Schema->append_table(td);\r
-//printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());\r
-\r
- }\r
- }\r
-\r
-// If the output is lfta-only, dump out the query name.\r
- if(split_queries.size() == 1 && !hfta_returned){\r
- if(output_query_names ){\r
- fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());\r
- }\r
-/*\r
-else{\r
- fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());\r
- }\r
-*/\r
-\r
-/*\r
-// output schema summary\r
- if(output_schema_summary){\r
- dump_summary(split_queries[0]);\r
- }\r
-*/\r
- }\r
-\r
-\r
- if(hfta_returned){ // query also has an HFTA component\r
- int hfta_nbr = split_queries.size()-1;\r
-\r
- hfta_list.push_back(split_queries[hfta_nbr]);\r
-\r
-// report on generated query names\r
- if(output_query_names){\r
- string hfta_name =split_queries[hfta_nbr]->query_name;\r
- fprintf(query_name_output,"%s H\n",hfta_name.c_str());\r
- for(l=0;l<hfta_nbr;++l){\r
- string lfta_name =split_queries[l]->query_name;\r
- fprintf(query_name_output,"%s L\n",lfta_name.c_str());\r
- }\r
- }\r
-// else{\r
-// fprintf(stderr,"query names are ");\r
-// for(l=0;l<hfta_nbr;++l){\r
-// if(l>0) fprintf(stderr,",");\r
-// string fta_name =split_queries[l]->query_name;\r
-// fprintf(stderr," %s",fta_name.c_str());\r
-// }\r
-// fprintf(stderr,"\n");\r
-// }\r
- }\r
-\r
- }else{\r
- fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());\r
- fprintf(stderr,"%s\n",rootq->get_error_str().c_str());\r
- exit(1);\r
- }\r
- }\r
-}\r
-\r
-\r
-//-----------------------------------------------------------------\r
-// Compute and propagate the SE in PROTOCOL fields compute a field.\r
-//-----------------------------------------------------------------\r
-\r
-for(i=0;i<lfta_list.size();i++){\r
- lfta_list[i]->generate_protocol_se(sq_map, Schema);\r
- sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];\r
-}\r
-for(i=0;i<hfta_list.size();i++){\r
- hfta_list[i]->generate_protocol_se(sq_map, Schema);\r
- sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];\r
-}\r
-\r
-\r
-\r
-//------------------------------------------------------------------------\r
-// Perform individual FTA optimizations\r
-//-----------------------------------------------------------------------\r
-\r
-if (partitioned_mode) {\r
-\r
- // open partition definition file\r
- string part_fname = config_dir_path + "partition.txt";\r
-\r
- FILE* partfd = fopen(part_fname.c_str(), "r");\r
- if (!partfd) {\r
- fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());\r
- exit(1);\r
- }\r
- PartnParser_setfileinput(partfd);\r
- if (PartnParserparse()) {\r
- fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());\r
- exit(1);\r
- }\r
- fclose(partfd);\r
-}\r
-\r
-\r
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
-\r
-int num_hfta = hfta_list.size();\r
-for(i=0; i < hfta_list.size(); ++i){\r
- hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);\r
-}\r
-\r
-// Add all new hftas to schema\r
-for(i=num_hfta; i < hfta_list.size(); ++i){\r
- table_def *td = hfta_list[i]->get_output_tabledef();\r
- Schema->append_table(td);\r
-}\r
-\r
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
-\r
-\r
-\r
-//------------------------------------------------------------------------\r
-// Do global (cross-fta) optimization\r
-//-----------------------------------------------------------------------\r
-\r
-\r
-\r
-\r
-\r
-\r
-set<string> extra_external_libs;\r
-\r
-for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component\r
-\r
- if(! debug_only){\r
-// build hfta file name, create output\r
- if(numeric_hfta_flname){\r
- sprintf(tmpstr,"hfta_%d",hfta_count);\r
- hfta_names.push_back(tmpstr);\r
- sprintf(tmpstr,"hfta_%d.cc",hfta_count);\r
- }else{\r
- sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());\r
- hfta_names.push_back(tmpstr);\r
- sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());\r
- }\r
- FILE *hfta_fl = fopen(tmpstr,"w");\r
- if(hfta_fl == NULL){\r
- fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);\r
- exit(1);\r
- }\r
- fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());\r
-\r
-// If there is a field verifier, warn about\r
-// lack of compatability\r
-// NOTE : this code assumes that visible non-lfta queries\r
-// are those at the root of a stream query.\r
- string hfta_comment;\r
- string hfta_title;\r
- string hfta_namespace;\r
- if(hfta_list[i]->defines.count("comment")>0)\r
- hfta_comment = hfta_list[i]->defines["comment"];\r
- if(hfta_list[i]->defines.count("Comment")>0)\r
- hfta_comment = hfta_list[i]->defines["Comment"];\r
- if(hfta_list[i]->defines.count("COMMENT")>0)\r
- hfta_comment = hfta_list[i]->defines["COMMENT"];\r
- if(hfta_list[i]->defines.count("title")>0)\r
- hfta_title = hfta_list[i]->defines["title"];\r
- if(hfta_list[i]->defines.count("Title")>0)\r
- hfta_title = hfta_list[i]->defines["Title"];\r
- if(hfta_list[i]->defines.count("TITLE")>0)\r
- hfta_title = hfta_list[i]->defines["TITLE"];\r
- if(hfta_list[i]->defines.count("namespace")>0)\r
- hfta_namespace = hfta_list[i]->defines["namespace"];\r
- if(hfta_list[i]->defines.count("Namespace")>0)\r
- hfta_namespace = hfta_list[i]->defines["Namespace"];\r
- if(hfta_list[i]->defines.count("Namespace")>0)\r
- hfta_namespace = hfta_list[i]->defines["Namespace"];\r
-\r
- if(field_verifier != NULL){\r
- string warning_str;\r
- if(hfta_comment == "")\r
- warning_str += "\tcomment not found.\n";\r
- if(hfta_title == "")\r
- warning_str += "\ttitle not found.\n";\r
- if(hfta_namespace == "")\r
- warning_str += "\tnamespace not found.\n";\r
-\r
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
- int fi;\r
- for(fi=0;fi<flds.size();fi++){\r
- field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
- }\r
- if(warning_str != "")\r
- fprintf(stderr,"Warning, in HFTA stream %s:\n%s",\r
- hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());\r
- }\r
-\r
- fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());\r
- if(hfta_comment != "")\r
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());\r
- if(hfta_title != "")\r
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());\r
- if(hfta_namespace != "")\r
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());\r
- fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);\r
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
-\r
-// write info about fields to qtree.xml\r
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
- int fi;\r
- for(fi=0;fi<flds.size();fi++){\r
- fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
- if(flds[fi]->get_modifier_list()->size()){\r
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
- }\r
- fprintf(qtree_output," />\n");\r
- }\r
-\r
- // extract liveness timeout from query definition\r
- int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());\r
- if (!liveness_timeout) {\r
-// fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",\r
-// hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
- liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
- }\r
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);\r
-\r
- vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();\r
- int itv;\r
- for(itv=0;itv<tmp_tv.size();++itv){\r
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());\r
- }\r
- string ifrs = hfta_list[i]->collect_refd_ifaces();\r
- if(ifrs != ""){\r
- fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());\r
- }\r
- fprintf(qtree_output,"\t</HFTA>\n");\r
-\r
- fclose(hfta_fl);\r
- }else{\r
-// debug only -- do code generation to catch generation-time errors.\r
- hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);\r
- }\r
-\r
- hfta_count++; // for hfta file names with numeric suffixes\r
-\r
- hfta_list[i]->get_external_libs(extra_external_libs);\r
-\r
- }\r
-\r
-string ext_lib_string;\r
-set<string>::iterator ssi_el;\r
-for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)\r
- ext_lib_string += (*ssi_el)+" ";\r
-\r
-\r
-\r
-// Report on the set of operator views\r
- for(i=0;i<opviews.size();++i){\r
- opview_entry *opve = opviews.get_entry(i);\r
- fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());\r
- fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());\r
- fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());\r
- fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());\r
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
-\r
- if (!opve->liveness_timeout) {\r
-// fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",\r
-// opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
- opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
- }\r
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);\r
- int j;\r
- for(j=0;j<opve->subq_names.size();j++)\r
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());\r
- fprintf(qtree_output,"\t</UDOP>\n");\r
- }\r
-\r
-\r
-//-----------------------------------------------------------------\r
-\r
-// Create interface-specific meta code files.\r
-// first, open and parse the interface resources file.\r
- ifaces_db = new ifq_t();\r
- ierr = "";\r
- if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
- fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
- ifx_fname.c_str(), ierr.c_str());\r
- exit(1);\r
- }\r
-\r
- map<string, vector<stream_query *> >::iterator svsi;\r
- for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){\r
- string lmach = (*svsi).first;\r
-\r
- // For this machine, create a set of lftas per interface.\r
- vector<stream_query *> mach_lftas = (*svsi).second;\r
- map<string, vector<stream_query *> > lfta_iface_lists;\r
- int li;\r
- for(li=0;li<mach_lftas.size();++li){\r
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
- string lfta_iface = tvec[0]->get_interface();\r
- lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
- }\r
-\r
- map<string, vector<stream_query *> >::iterator lsvsi;\r
- for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){\r
- int erri;\r
- string liface = (*lsvsi).first;\r
- vector<stream_query *> iface_lftas = (*lsvsi).second;\r
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
- if(iface_codegen_type.size()){\r
- nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
- if(!nicprop){\r
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
- exit(1);\r
- }\r
- string mcs = generate_nic_code(iface_lftas, nicprop);\r
- string mcf_flnm;\r
- if(lmach != "")\r
- mcf_flnm = lmach + "_"+liface+".mcf";\r
- else\r
- mcf_flnm = hostname + "_"+liface+".mcf";\r
- FILE *mcf_fl ;\r
- if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));\r
- exit(1);\r
- }\r
- fprintf(mcf_fl,"%s",mcs.c_str());\r
- fclose(mcf_fl);\r
-//printf("mcs of machine %s, iface %s of type %s is \n%s\n",\r
-//lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());\r
- }\r
- }\r
-\r
-\r
- }\r
-\r
-\r
-\r
-//-----------------------------------------------------------------\r
-\r
-\r
-// Find common filter predicates in the LFTAs.\r
-// in addition generate structs to store the temporal attributes unpacked by prefilter\r
- \r
- map<string, vector<stream_query *> >::iterator ssqi;\r
- for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){\r
-\r
- string lmach = (*ssqi).first;\r
- bool packed_return = false;\r
- int li, erri;\r
-\r
-\r
-// The LFTAs of this machine.\r
- vector<stream_query *> mach_lftas = (*ssqi).second;\r
-// break up on a per-interface basis.\r
- map<string, vector<stream_query *> > lfta_iface_lists;\r
- map<string, vector<int> > lfta_iface_qname_ix; // need the query name\r
- // for fta_init\r
- for(li=0;li<mach_lftas.size();++li){\r
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
- string lfta_iface = tvec[0]->get_interface();\r
- lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
- lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);\r
- }\r
-\r
-\r
-// Are the return values "packed"?\r
-// This should be done on a per-interface basis.\r
-// But this is defunct code for gs-lite\r
- for(li=0;li<mach_lftas.size();++li){\r
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
- string liface = tvec[0]->get_interface();\r
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
- if(iface_codegen_type.size()){\r
- if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){\r
- packed_return = true;\r
- }\r
- }\r
- }\r
-\r
-\r
-// Separate lftas by interface, collect results on a per-interface basis.\r
-\r
- vector<cnf_set *> no_preds; // fallback if there is no prefilter\r
- map<string, vector<cnf_set *> > prefilter_preds;\r
- set<unsigned int> pred_ids; // this can be global for all interfaces\r
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
- string liface = (*mvsi).first;\r
- vector<cnf_set *> empty_list;\r
- prefilter_preds[liface] = empty_list;\r
- if(! packed_return){\r
- get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);\r
- }\r
-\r
-// get NIC capabilities. (Is this needed?)\r
- nic_property *nicprop = NULL;\r
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
- if(iface_codegen_type.size()){\r
- nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
- if(!nicprop){\r
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
- exit(1);\r
- }\r
- }\r
- }\r
-\r
-\r
-// Now that we know the prefilter preds, generate the lfta code.\r
-// Do this for all lftas in this machine.\r
- for(li=0;li<mach_lftas.size();++li){\r
- set<unsigned int> subsumed_preds;\r
- set<unsigned int>::iterator sii;\r
-#ifdef PREFILTER_OK\r
- for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){\r
- int pid = (*sii);\r
- if((pid>>16) == li){\r
- subsumed_preds.insert(pid & 0xffff);\r
- }\r
- }\r
-#endif\r
- string lfta_schema_str = mach_lftas[li]->make_schema();\r
- string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
- nic_property *nicprop = NULL; // no NIC properties?\r
- lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);\r
- }\r
-\r
-\r
-// generate structs to store the temporal attributes\r
-// unpacked by prefilter\r
- col_id_set temp_cids;\r
- get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);\r
- lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);\r
-\r
-// Compute the lfta bit signatures and the lfta colrefs\r
-// do this on a per-interface basis\r
-#ifdef PREFILTER_OK\r
- lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
-#endif\r
- map<string, vector<long long int> > lfta_sigs; // used again later\r
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
- string liface = (*mvsi).first;\r
- vector<long long int> empty_list;\r
- lfta_sigs[liface] = empty_list;\r
-\r
- vector<col_id_set> lfta_cols;\r
- vector<int> lfta_snap_length;\r
- for(li=0;li<lfta_iface_lists[liface].size();++li){\r
- unsigned long long int mask=0, bpos=1;\r
- int f_pos;\r
- for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){\r
- if(prefilter_preds[liface][f_pos]->lfta_id.count(li))\r
- mask |= bpos;\r
- bpos = bpos << 1;\r
- }\r
- lfta_sigs[liface].push_back(mask);\r
- lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));\r
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));\r
- }\r
-\r
-//for(li=0;li<mach_lftas.size();++li){\r
-//printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);\r
-//col_id_set::iterator tcisi;\r
-//for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){\r
-//printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);\r
-//}\r
-//}\r
-\r
-\r
-// generate the prefilter\r
-// Do this on a per-interface basis, except for the #define\r
-#ifdef PREFILTER_OK\r
-// lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
- lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
-#else\r
- lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
-\r
-#endif\r
- }\r
-\r
-// Generate interface parameter lookup function\r
- lfta_val[lmach] += "// lookup interface properties by name\n";\r
- lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";\r
- lfta_val[lmach] += "// returns NULL if given property does not exist\n";\r
- lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";\r
-\r
-// collect a lit of interface names used by queries running on this host\r
- set<std::string> iface_names;\r
- for(i=0;i<mach_query_names[lmach].size();i++){\r
- int mi = mach_query_names[lmach][i];\r
- stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
-\r
- if(interface_names[mi]=="")\r
- iface_names.insert("DEFAULTDEV");\r
- else\r
- iface_names.insert(interface_names[mi]);\r
- }\r
-\r
-// generate interface property lookup code for every interface\r
- set<std::string>::iterator sir;\r
- for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {\r
- if (sir == iface_names.begin())\r
- lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
- else\r
- lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
-\r
- // iterate through interface properties\r
- vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);\r
- if (erri) {\r
- fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());\r
- exit(1);\r
- }\r
- if (iface_properties.empty())\r
- lfta_val[lmach] += "\t\treturn NULL;\n";\r
- else {\r
- for (int i = 0; i < iface_properties.size(); ++i) {\r
- if (i == 0)\r
- lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
- else\r
- lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
-\r
- // combine all values for the interface property using comma separator\r
- vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);\r
- for (int j = 0; j < vals.size(); ++j) {\r
- lfta_val[lmach] += "\t\t\treturn \"" + vals[j];\r
- if (j != vals.size()-1)\r
- lfta_val[lmach] += ",";\r
- }\r
- lfta_val[lmach] += "\";\n";\r
- }\r
- lfta_val[lmach] += "\t\t} else\n";\r
- lfta_val[lmach] += "\t\t\treturn NULL;\n";\r
- }\r
- }\r
- lfta_val[lmach] += "\t} else\n";\r
- lfta_val[lmach] += "\t\treturn NULL;\n";\r
- lfta_val[lmach] += "}\n\n";\r
-\r
-\r
-// Generate a full list of FTAs for clearinghouse reference\r
- lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";\r
- lfta_val[lmach] += "const gs_sp_t fta_names[] = {";\r
-\r
- for (i = 0; i < query_names.size(); ++i) {\r
- if (i)\r
- lfta_val[lmach] += ", ";\r
- lfta_val[lmach] += "\"" + query_names[i] + "\"";\r
- }\r
- for (i = 0; i < hfta_list.size(); ++i) {\r
- lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";\r
- }\r
- lfta_val[lmach] += ", NULL};\n\n";\r
-\r
-\r
-// Add the initialization function to lfta.c\r
-// Change to accept the interface name, and \r
-// set the prefilter function accordingly.\r
-// see the example in demo/err2\r
- lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";\r
-\r
-// for(i=0;i<mach_query_names[lmach].size();i++)\r
-// int mi = mach_query_names[lmach][i];\r
-// stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
-\r
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
- string liface = (*mvsi).first;\r
- vector<stream_query *> lfta_list = (*mvsi).second;\r
- for(i=0;i<lfta_list.size();i++){\r
- stream_query *lfta_sq = lfta_list[i];\r
- int mi = lfta_iface_qname_ix[liface][i];\r
- \r
- fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);\r
-\r
- string this_iface = "DEFAULTDEV";\r
- if(interface_names[mi]!="")\r
- this_iface = '"'+interface_names[mi]+'"';\r
- lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";\r
- lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";\r
-// if(interface_names[mi]=="")\r
-// lfta_val[lmach]+="DEFAULTDEV";\r
-// else\r
-// lfta_val[lmach]+='"'+interface_names[mi]+'"';\r
- lfta_val[lmach] += this_iface;\r
-\r
-\r
- lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])\r
- +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])\r
- +"\n#endif\n";\r
- sprintf(tmpstr,",%d",snap_lengths[mi]);\r
- lfta_val[lmach] += tmpstr;\r
-\r
-// unsigned long long int mask=0, bpos=1;\r
-// int f_pos;\r
-// for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){\r
-// if(prefilter_preds[f_pos]->lfta_id.count(i))\r
-// mask |= bpos;\r
-// bpos = bpos << 1;\r
-// }\r
-\r
-#ifdef PREFILTER_OK\r
-// sprintf(tmpstr,",%lluull",mask);\r
- sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);\r
- lfta_val[lmach]+=tmpstr;\r
-#else\r
- lfta_val[lmach] += ",0ull";\r
-#endif\r
-\r
- lfta_val[lmach] += ");\n";\r
-\r
-\r
-\r
-// End of lfta prefilter stuff\r
-// --------------------------------------------------\r
-\r
-// If there is a field verifier, warn about\r
-// lack of compatability\r
- string lfta_comment;\r
- string lfta_title;\r
- string lfta_namespace;\r
- map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();\r
- if(ldefs.count("comment")>0)\r
- lfta_comment = lfta_sq->defines["comment"];\r
- if(ldefs.count("Comment")>0)\r
- lfta_comment = lfta_sq->defines["Comment"];\r
- if(ldefs.count("COMMENT")>0)\r
- lfta_comment = lfta_sq->defines["COMMENT"];\r
- if(ldefs.count("title")>0)\r
- lfta_title = lfta_sq->defines["title"];\r
- if(ldefs.count("Title")>0)\r
- lfta_title = lfta_sq->defines["Title"];\r
- if(ldefs.count("TITLE")>0)\r
- lfta_title = lfta_sq->defines["TITLE"];\r
- if(ldefs.count("NAMESPACE")>0)\r
- lfta_namespace = lfta_sq->defines["NAMESPACE"];\r
- if(ldefs.count("Namespace")>0)\r
- lfta_namespace = lfta_sq->defines["Namespace"];\r
- if(ldefs.count("namespace")>0)\r
- lfta_namespace = lfta_sq->defines["namespace"];\r
-\r
- string lfta_ht_size;\r
- if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")\r
- lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);\r
- if(ldefs.count("aggregate_slots")>0){\r
- lfta_ht_size = ldefs["aggregate_slots"];\r
- }\r
-\r
-// NOTE : I'm assuming that visible lftas do not start with _fta.\r
-// -- will fail for non-visible simple selection queries.\r
- if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){\r
- string warning_str;\r
- if(lfta_comment == "")\r
- warning_str += "\tcomment not found.\n";\r
- if(lfta_title == "")\r
- warning_str += "\ttitle not found.\n";\r
- if(lfta_namespace == "")\r
- warning_str += "\tnamespace not found.\n";\r
-\r
- vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
- int fi;\r
- for(fi=0;fi<flds.size();fi++){\r
- field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
- }\r
- if(warning_str != "")\r
- fprintf(stderr,"Warning, in LFTA stream %s:\n%s",\r
- query_names[mi].c_str(),warning_str.c_str());\r
- }\r
-\r
-\r
-// Create qtree output\r
- fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());\r
- if(lfta_comment != "")\r
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());\r
- if(lfta_title != "")\r
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());\r
- if(lfta_namespace != "")\r
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());\r
- if(lfta_ht_size != "")\r
- fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());\r
- if(lmach != "")\r
- fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());\r
- else\r
- fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());\r
- fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());\r
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());\r
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);\r
-// write info about fields to qtree.xml\r
- vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
- int fi;\r
- for(fi=0;fi<flds.size();fi++){\r
- fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
- if(flds[fi]->get_modifier_list()->size()){\r
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
- }\r
- fprintf(qtree_output," />\n");\r
- }\r
- fprintf(qtree_output,"\t</LFTA>\n");\r
-\r
-\r
- }\r
- }\r
-\r
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
- string liface = (*mvsi).first;\r
- lfta_val[lmach] += \r
-" if (!strcmp(device, \""+liface+"\")) \n"\r
-" lfta_prefilter = &lfta_prefilter_"+liface+"; \n"\r
-;\r
- }\r
- lfta_val[lmach] += \r
-" if(lfta_prefilter == NULL){\n"\r
-" fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"\r
-" exit(1);\n"\r
-" }\n"\r
-;\r
-\r
-\r
-\r
- lfta_val[lmach] += "}\n\n";\r
-\r
- if(!(debug_only || hfta_only) ){\r
- string lfta_flnm;\r
- if(lmach != "")\r
- lfta_flnm = lmach + "_lfta.c";\r
- else\r
- lfta_flnm = hostname + "_lfta.c";\r
- if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){\r
- fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
- exit(1);\r
- }\r
- fprintf(lfta_out,"%s",lfta_header.c_str());\r
- fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());\r
- fprintf(lfta_out,"%s",lfta_val[lmach].c_str());\r
- fclose(lfta_out);\r
- }\r
- }\r
-\r
-// Say what are the operators which must execute\r
- if(opviews.size()>0)\r
- fprintf(stderr,"The queries use the following external operators:\n");\r
- for(i=0;i<opviews.size();++i){\r
- opview_entry *opv = opviews.get_entry(i);\r
- fprintf(stderr,"\t%s\n",opv->view_name.c_str());\r
- }\r
-\r
- if(create_makefile)\r
- generate_makefile(input_file_names, nfiles, hfta_names, opviews,\r
- machine_names, schema_file_name,\r
- interface_names,\r
- ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);\r
-\r
-\r
- fprintf(qtree_output,"</QueryNodes>\n");\r
-\r
- return(0);\r
-}\r
-\r
-////////////////////////////////////////////////////////////\r
-\r
-void generate_makefile(vector<string> &input_file_names, int nfiles,\r
- vector<string> &hfta_names, opview_set &opviews,\r
- vector<string> &machine_names,\r
- string schema_file_name,\r
- vector<string> &interface_names,\r
- ifq_t *ifdb, string &config_dir_path,\r
- bool use_pads,\r
- string extra_libs,\r
- map<string, vector<int> > &rts_hload\r
- ){\r
- int i,j;\r
-\r
- if(config_dir_path != ""){\r
- config_dir_path = "-C "+config_dir_path;\r
- }\r
-\r
- struct stat sb;\r
- bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;\r
- bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;\r
-\r
-// if(libz_exists && !libast_exists){\r
-// fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");\r
-// exit(1);\r
-// }\r
-\r
-// Get set of operator executable files to run\r
- set<string> op_fls;\r
- set<string>::iterator ssi;\r
- for(i=0;i<opviews.size();++i){\r
- opview_entry *opv = opviews.get_entry(i);\r
- if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);\r
- }\r
-\r
- FILE *outfl = fopen("Makefile", "w");\r
- if(outfl==NULL){\r
- fprintf(stderr,"Can't open Makefile for write, exiting.\n");\r
- exit(0);\r
- }\r
-\r
- fputs(\r
-("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"\r
-"CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"\r
-).c_str(), outfl\r
-);\r
- if(generate_stats)\r
- fprintf(outfl," -DLFTA_STATS");\r
-\r
-// Gather the set of interfaces\r
-// Also, gather "base interface names" for use in computing\r
-// the hash splitting to virtual interfaces.\r
-// TODO : must update to hanndle machines\r
- set<string> ifaces;\r
- set<string> base_vifaces; // base interfaces of virtual interfaces\r
- map<string, string> ifmachines;\r
- map<string, string> ifattrs;\r
- for(i=0;i<interface_names.size();++i){\r
- ifaces.insert(interface_names[i]);\r
- ifmachines[interface_names[i]] = machine_names[i];\r
-\r
- size_t Xpos = interface_names[i].find_last_of("X");\r
- if(Xpos!=string::npos){\r
- string iface = interface_names[i].substr(0,Xpos);\r
- base_vifaces.insert(iface);\r
- }\r
- // get interface attributes and add them to the list\r
- }\r
-\r
-// Do we need to include protobuf libraries?\r
- bool use_proto = false;\r
- int erri;\r
- string err_str;\r
- for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
- string ifnm = (*ssi);\r
- vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);\r
- for(int ift_i=0;ift_i<ift.size();ift_i++){\r
- if(ift[ift_i]=="PROTO"){\r
- use_proto = true;\r
- }\r
- }\r
- }\r
-\r
- fprintf(outfl,\r
-"\n"\r
-"\n"\r
-"all: rts");\r
- for(i=0;i<hfta_names.size();++i)\r
- fprintf(outfl," %s",hfta_names[i].c_str());\r
- fputs(\r
-("\n"\r
-"\n"\r
-"rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"\r
-"\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);\r
- if(use_pads)\r
- fprintf(outfl,"-L. ");\r
- fputs(\r
-("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);\r
- if(use_pads)\r
- fprintf(outfl,"-lgscppads -lpads ");\r
- fprintf(outfl,\r
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");\r
- if(use_pads)\r
- fprintf(outfl, " -lpz -lz -lbz ");\r
- if(libz_exists && libast_exists)\r
- fprintf(outfl," -last ");\r
- if(use_pads)\r
- fprintf(outfl, " -ldll -ldl ");\r
- if(use_proto)\r
- fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");\r
- fprintf(outfl," -lgscpaux");\r
-#ifdef GCOV\r
- fprintf(outfl," -fprofile-arcs");\r
-#endif\r
- fprintf(outfl,\r
-"\n"\r
-"\n"\r
-"lfta.o: %s_lfta.c\n"\r
-"\t$(CC) -o lfta.o -c %s_lfta.c\n"\r
-"\n"\r
-"%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());\r
- for(i=0;i<nfiles;++i)\r
- fprintf(outfl," %s",input_file_names[i].c_str());\r
- if(hostname == ""){\r
- fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
- }else{\r
- fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
- }\r
- for(i=0;i<nfiles;++i)\r
- fprintf(outfl," %s",input_file_names[i].c_str());\r
- fprintf(outfl,"\n");\r
-\r
- for(i=0;i<hfta_names.size();++i)\r
- fprintf(outfl,\r
-("%s: %s.o\n"\r
-"\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"\r
-"\n"\r
-"%s.o: %s.cc\n"\r
-"\t$(CPP) -o %s.o -c %s.cc\n"\r
-"\n"\r
-"\n").c_str(),\r
- hfta_names[i].c_str(), hfta_names[i].c_str(),\r
- hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),\r
- hfta_names[i].c_str(), hfta_names[i].c_str(),\r
- hfta_names[i].c_str(), hfta_names[i].c_str()\r
- );\r
-\r
- fprintf(outfl,\r
-("\n"\r
-"packet_schema.txt:\n"\r
-"\tln -s "+root_path+"/cfg/packet_schema.txt .\n"\r
-"\n"\r
-"external_fcns.def:\n"\r
-"\tln -s "+root_path+"/cfg/external_fcns.def .\n"\r
-"\n"\r
-"clean:\n"\r
-"\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());\r
- for(i=0;i<hfta_names.size();++i)\r
- fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());\r
- fprintf(outfl,"\n");\r
-\r
- fclose(outfl);\r
-\r
-\r
-\r
-// Gather the set of interfaces\r
-// TODO : must update to hanndle machines\r
-// TODO : lookup interface attributes and add them as a parameter to rts process\r
- outfl = fopen("runit", "w");\r
- if(outfl==NULL){\r
- fprintf(stderr,"Can't open runit for write, exiting.\n");\r
- exit(0);\r
- }\r
-\r
-\r
- fputs(\r
-("#!/bin/sh\n"\r
-"./stopit\n"\r
-+root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"\r
-"sleep 5\n"\r
-"if [ ! -f gshub.log ]\n"\r
-"then\n"\r
-"\techo \"Failed to start bin/gshub.py\"\n"\r
-"\texit -1\n"\r
-"fi\n"\r
-"ADDR=`cat gshub.log`\n"\r
-"ps opgid= $! >> gs.pids\n"\r
-"./rts $ADDR default ").c_str(), outfl);\r
-// int erri;\r
-// string err_str;\r
- for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
- string ifnm = (*ssi);\r
- fprintf(outfl, "%s ",ifnm.c_str());\r
- vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);\r
- for(j=0;j<ifv.size();++j)\r
- fprintf(outfl, "%s ",ifv[j].c_str());\r
- }\r
- fprintf(outfl, " &\n");\r
- fprintf(outfl, "echo $! >> gs.pids\n");\r
- for(i=0;i<hfta_names.size();++i)\r
- fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());\r
-\r
- for(j=0;j<opviews.opview_list.size();++j){\r
- fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());\r
- }\r
-\r
- fclose(outfl);\r
- system("chmod +x runit");\r
-\r
- outfl = fopen("stopit", "w");\r
- if(outfl==NULL){\r
- fprintf(stderr,"Can't open stopit for write, exiting.\n");\r
- exit(0);\r
- }\r
-\r
- fprintf(outfl,"#!/bin/sh\n"\r
-"rm -f gshub.log\n"\r
-"if [ ! -f gs.pids ]\n"\r
-"then\n"\r
-"exit\n"\r
-"fi\n"\r
-"for pgid in `cat gs.pids`\n"\r
-"do\n"\r
-"kill -TERM -$pgid\n"\r
-"done\n"\r
-"sleep 1\n"\r
-"for pgid in `cat gs.pids`\n"\r
-"do\n"\r
-"kill -9 -$pgid\n"\r
-"done\n"\r
-"rm gs.pids\n");\r
-\r
- fclose(outfl);\r
- system("chmod +x stopit");\r
-\r
-//-----------------------------------------------\r
-\r
-/* For now disable support for virtual interfaces\r
- outfl = fopen("set_vinterface_hash.bat", "w");\r
- if(outfl==NULL){\r
- fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");\r
- exit(0);\r
- }\r
-\r
-// The format should be determined by an entry in the ifres.xml file,\r
-// but for now hardcode the only example I have.\r
- for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){\r
- if(rts_hload.count((*ssi))){\r
- string iface_name = (*ssi);\r
- string iface_number = "";\r
- for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){\r
- if(isdigit(iface_name[j])){\r
- iface_number = iface_name[j];\r
- if(j>0 && isdigit(iface_name[j-1]))\r
- iface_number = iface_name[j-1] + iface_number;\r
- }\r
- }\r
-\r
- fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());\r
- vector<int> halloc = rts_hload[iface_name];\r
- int prev_limit = 0;\r
- for(j=0;j<halloc.size();++j){\r
- if(j>0)\r
- fprintf(outfl,":");\r
- fprintf(outfl,"%d-%d",prev_limit,halloc[j]);\r
- prev_limit = halloc[j];\r
- }\r
- fprintf(outfl,"\n");\r
- }\r
- }\r
- fclose(outfl);\r
- system("chmod +x set_vinterface_hash.bat");\r
-*/\r
-}\r
-\r
-// Code for implementing a local schema\r
-/*\r
- table_list qpSchema;\r
-\r
-// Load the schemas of any LFTAs.\r
- int l;\r
- for(l=0;l<hfta_nbr;++l){\r
- stream_query *sq0 = split_queries[l];\r
- table_def *td = sq0->get_output_tabledef();\r
- qpSchema.append_table(td);\r
- }\r
-// load the schemas of any other ref'd tables.\r
-// (e.g., hftas)\r
- vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();\r
- int ti;\r
- for(ti=0;ti<input_tbl_names.size();++ti){\r
- int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());\r
- if(tbl_ref < 0){\r
- tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());\r
- if(tbl_ref < 0){\r
- fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());\r
- exit(1);\r
- }\r
- qpSchema.append_table(Schema->get_table(tbl_ref));\r
- }\r
- }\r
-*/\r
-\r
-// Functions related to parsing.\r
-\r
-/*\r
-static int split_string(char *instr,char sep, char **words,int max_words){\r
- char *loc;\r
- char *str;\r
- int nwords = 0;\r
-\r
- str = instr;\r
- words[nwords++] = str;\r
- while( (loc = strchr(str,sep)) != NULL){\r
- *loc = '\0';\r
- str = loc+1;\r
- if(nwords >= max_words){\r
- fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);\r
- nwords = max_words-1;\r
- }\r
- words[nwords++] = str;\r
- }\r
-\r
- return(nwords);\r
-}\r
-\r
-*/\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#include<unistd.h> // for gethostname
+
+#include <string>
+#include "parse_fta.h"
+#include "parse_schema.h"
+#include "parse_ext_fcns.h"
+#include"analyze_fta.h"
+#include"query_plan.h"
+#include"generate_lfta_code.h"
+#include"stream_query.h"
+#include"generate_utils.h"
+#include"nic_def.h"
+#include"generate_nic_code.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include<ctype.h>
+#include<glob.h>
+#include<string.h>
+
+#include<list>
+
+// for the scandir
+ #include <sys/types.h>
+ #include <dirent.h>
+
+
+#include<errno.h>
+
+// to verify that some files exist.
+ #include <sys/types.h>
+ #include <sys/stat.h>
+
+#include "parse_partn.h"
+
+#include "print_plan.h"
+
+// Interface to the xml parser
+
+#include"xml_t.h"
+#include"field_list.h"
+
+#include "gsconfig.h"
+
+extern int xmlParserparse(void);
+extern FILE *xmlParserin;
+extern int xmlParserdebug;
+
+std::vector<std::string> xml_attr_vec;
+std::vector<std::string> xml_val_vec;
+std::string xml_a, xml_v;
+xml_t *xml_leaves = NULL;
+
+// Interface to the field list verifier
+field_list *field_verifier = NULL;
+
+#define TMPSTRLEN 1000
+
+#ifndef PATH_DELIM
+ #define PATH_DELIM '/'
+#endif
+
+char tmp_schema_str[10000];
+
+// maximum delay between two hearbeats produced
+// by UDOP. Used when its not explicity
+// provided in udop definition
+#define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
+
+// Default lfta hash table size, must be power of 2.
+int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
+
+// Interface to FTA definition lexer and parser ...
+
+extern int FtaParserparse(void);
+extern FILE *FtaParserin;
+extern int FtaParserdebug;
+
+fta_parse_t *fta_parse_result;
+var_defs_t *fta_parse_defines;
+
+
+
+// Interface to external function lexer and parser ...
+
+extern int Ext_fcnsParserparse(void);
+extern FILE *Ext_fcnsParserin;
+extern int Ext_fcnsParserdebug;
+
+ext_fcn_list *Ext_fcns;
+
+
+// Interface to partition definition parser
+extern int PartnParserparse();
+partn_def_list_t *partn_parse_result = NULL;
+
+
+
+using namespace std;
+//extern int errno;
+
+
+// forward delcaration of local utility function
+void generate_makefile(vector<string> &input_file_names, int nfiles,
+ vector<string> &hfta_names, opview_set &opviews,
+ vector<string> &machine_names,
+ string schema_file_name,
+ vector<string> &interface_names,
+ ifq_t *ifdb, string &config_dir_path,
+ bool use_pads,
+ string extra_libs,
+ map<string, vector<int> > &rts_hload
+ );
+
+//static int split_string(char *instr,char sep, char **words,int max_words);
+#define MAXFLDS 100
+
+ FILE *schema_summary_output = NULL; // query names
+
+// Dump schema summary
+void dump_summary(stream_query *str){
+ fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
+
+ table_def *sch = str->get_output_tabledef();
+
+ vector<field_entry *> flds = sch->get_fields();
+ int f;
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+}
+
+// Globals
+string hostname; // name of current host.
+int hostname_len;
+bool generate_stats = false;
+string root_path = "../..";
+
+
+int main(int argc, char **argv){
+ char tmpstr[TMPSTRLEN];
+ string err_str;
+ int q,s,h,f;
+
+ set<int>::iterator si;
+
+ vector<string> registration_query_names; // for lfta.c registration
+ map<string, vector<int> > mach_query_names; // list queries of machine
+ vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
+ vector<string> interface_names; // for lfta.c registration
+ vector<string> machine_names; // machine of interface
+ vector<bool> lfta_reuse_options; // for lfta.c registration
+ vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
+ vector<string> hfta_names; // hfta cource code names, for
+ // creating make file.
+ vector<string> qnames; // ensure unique names
+ map<string, int> lfta_names; // keep track of unique lftas.
+
+
+// set these to 1 to debug the parser
+ FtaParserdebug = 0;
+ Ext_fcnsParserdebug = 0;
+
+ FILE *lfta_out; // lfta.c output.
+ FILE *fta_in; // input file
+ FILE *table_schemas_in; // source tables definition file
+ FILE *query_name_output; // query names
+ FILE *qtree_output; // interconnections of query nodes
+
+ // -------------------------------
+ // Handling of Input Arguments
+ // -------------------------------
+ char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
+ const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
+ "\t[-B] : debug only (don't create output files)\n"
+ "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
+ "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
+ "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
+ "\t[-C] : use <config directory> for definition files\n"
+ "\t[-l] : use <library directory> for library queries\n"
+ "\t[-N] : output query names in query_names.txt\n"
+ "\t[-H] : create HFTA only (no schema_file)\n"
+ "\t[-Q] : use query name for hfta suffix\n"
+ "\t[-M] : generate make file and runit, stopit scripts\n"
+ "\t[-S] : enable LFTA statistics (alters Makefile).\n"
+ "\t[-f] : Output schema summary to schema_summary.txt\n"
+ "\t[-P] : link with PADS\n"
+ "\t[-h] : override host name.\n"
+ "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
+ "\t[-R] : path to root of GS-lite\n"
+;
+
+// parameters gathered from command line processing
+ string external_fcns_path;
+// string internal_fcn_path;
+ string config_dir_path;
+ string library_path = "./";
+ vector<string> input_file_names;
+ string schema_file_name;
+ bool debug_only = false;
+ bool hfta_only = false;
+ bool output_query_names = false;
+ bool output_schema_summary=false;
+ bool numeric_hfta_flname = true;
+ bool create_makefile = false;
+ bool distributed_mode = false;
+ bool partitioned_mode = false;
+ bool use_live_hosts_file = false;
+ bool use_pads = false;
+ bool clean_make = false;
+ int n_virtual_interfaces = 1;
+
+ char chopt;
+ while((chopt = getopt(argc,argv,optstr)) != -1){
+ switch(chopt){
+ case 'B':
+ debug_only = true;
+ break;
+ case 'D':
+ distributed_mode = true;
+ break;
+ case 'p':
+ partitioned_mode = true;
+ break;
+ case 'L':
+ use_live_hosts_file = true;
+ break;
+ case 'C':
+ if(optarg != NULL)
+ config_dir_path = string(optarg) + string("/");
+ break;
+ case 'l':
+ if(optarg != NULL)
+ library_path = string(optarg) + string("/");
+ break;
+ case 'N':
+ output_query_names = true;
+ break;
+ case 'Q':
+ numeric_hfta_flname = false;
+ break;
+ case 'H':
+ if(schema_file_name == ""){
+ hfta_only = true;
+ }
+ break;
+ case 'f':
+ output_schema_summary=true;
+ break;
+ case 'M':
+ create_makefile=true;
+ break;
+ case 'S':
+ generate_stats=true;
+ break;
+ case 'P':
+ use_pads = true;
+ break;
+ case 'c':
+ clean_make = true;
+ break;
+ case 'h':
+ if(optarg != NULL)
+ hostname = optarg;
+ break;
+ case 'R':
+ if(optarg != NULL)
+ root_path = optarg;
+ break;
+ case 'n':
+ if(optarg != NULL){
+ n_virtual_interfaces = atoi(optarg);
+ if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
+ fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
+ n_virtual_interfaces = 1;
+ }
+ }
+ break;
+ case '?':
+ fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
+ fprintf(stderr,"%s\n", usage_str);
+ exit(1);
+ default:
+ fprintf(stderr, "Argument was %c\n", optopt);
+ fprintf(stderr,"Invalid arguments\n");
+ fprintf(stderr,"%s\n", usage_str);
+ exit(1);
+ }
+ }
+ argc -= optind;
+ argv += optind;
+ for (int i = 0; i < argc; ++i) {
+ if((schema_file_name == "") && !hfta_only){
+ schema_file_name = argv[i];
+ }else{
+ input_file_names.push_back(argv[i]);
+ }
+ }
+
+ if(input_file_names.size() == 0){
+ fprintf(stderr,"%s\n", usage_str);
+ exit(1);
+ }
+
+ if(clean_make){
+ string clean_cmd = "rm Makefile hfta_*.cc";
+ int clean_ret = system(clean_cmd.c_str());
+ if(clean_ret){
+ fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
+ }
+ }
+
+
+ nic_prop_db *npdb = new nic_prop_db(config_dir_path);
+
+// Open globally used file names.
+
+ // prepend config directory to schema file
+ schema_file_name = config_dir_path + schema_file_name;
+ external_fcns_path = config_dir_path + string("external_fcns.def");
+ string ifx_fname = config_dir_path + string("ifres.xml");
+
+// Find interface query file(s).
+ if(hostname == ""){
+ gethostname(tmpstr,TMPSTRLEN);
+ hostname = tmpstr;
+ }
+ hostname_len = strlen(tmpstr);
+ string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
+ vector<string> ifq_fls;
+
+ ifq_fls.push_back(ifq_fname);
+
+
+// Get the field list, if it exists
+ string flist_fl = config_dir_path + "field_list.xml";
+ FILE *flf_in = NULL;
+ if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
+ fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
+ xml_leaves = new xml_t();
+ xmlParser_setfileinput(flf_in);
+ if(xmlParserparse()){
+ fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
+ }else{
+ field_verifier = new field_list(xml_leaves);
+ }
+ }
+
+ if(!hfta_only){
+ if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
+ fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
+ exit(1);
+ }
+ }
+
+/*
+ if(!(debug_only || hfta_only)){
+ if((lfta_out = fopen("lfta.c","w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
+ exit(1);
+ }
+ }
+*/
+
+// Get the output specification file.
+// format is
+// query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
+ string ospec_fl = "output_spec.cfg";
+ FILE *osp_in = NULL;
+ vector<ospec_str *> output_specs;
+ multimap<string, int> qname_to_ospec;
+ if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
+ char *flds[MAXFLDS];
+ int o_lineno = 0;
+ while(fgets(tmpstr,TMPSTRLEN,osp_in)){
+ o_lineno++;
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
+// make operator type lowercase
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
+ }
+ }
+ fclose(osp_in);
+ }else{
+ fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
+ exit(1);
+ }
+
+// hfta parallelism
+ string pspec_fl = "hfta_parallelism.cfg";
+ FILE *psp_in = NULL;
+ map<string, int> hfta_parallelism;
+ if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
+ char *flds[MAXFLDS];
+ int o_lineno = 0;
+ while(fgets(tmpstr,TMPSTRLEN,psp_in)){
+ bool good_entry = true;
+ o_lineno++;
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);
+ if(nflds == 2){
+ string hname = flds[0];
+ int par = atoi(flds[1]);
+ if(par <= 0 || par > n_virtual_interfaces){
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
+ good_entry = false;
+ }
+ if(good_entry && n_virtual_interfaces % par != 0){
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
+ good_entry = false;
+ }
+ if(good_entry)
+ hfta_parallelism[hname] = par;
+ }
+ }
+ }else{
+ fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
+ }
+
+
+// LFTA hash table sizes
+ string htspec_fl = "lfta_htsize.cfg";
+ FILE *htsp_in = NULL;
+ map<string, int> lfta_htsize;
+ if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
+ char *flds[MAXFLDS];
+ int o_lineno = 0;
+ while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
+ bool good_entry = true;
+ o_lineno++;
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);
+ if(nflds == 2){
+ string lfta_name = flds[0];
+ int htsz = atoi(flds[1]);
+ if(htsz>0){
+ lfta_htsize[lfta_name] = htsz;
+ }else{
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
+ }
+ }
+ }
+ }else{
+ fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
+ }
+
+// LFTA vitual interface hash split
+ string rtlspec_fl = "rts_load.cfg";
+ FILE *rtl_in = NULL;
+ map<string, vector<int> > rts_hload;
+ if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
+ char *flds[MAXFLDS];
+ int r_lineno = 0;
+ string iface_name;
+ vector<int> hload;
+ while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
+ bool good_entry = true;
+ r_lineno++;
+ iface_name = "";
+ hload.clear();
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);
+ if(nflds >1){
+ iface_name = flds[0];
+ int cumm_h = 0;
+ int j;
+ for(j=1;j<nflds;++j){
+ int h = atoi(flds[j]);
+ if(h<=0)
+ good_entry = false;
+ cumm_h += h;
+ hload.push_back(cumm_h);
+ }
+ }else{
+ good_entry = false;
+ }
+ if(good_entry){
+ rts_hload[iface_name] = hload;
+ }else{
+ fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
+ }
+ }
+ }
+
+
+
+ if(output_query_names){
+ if((query_name_output = fopen("query_names.txt","w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
+ exit(1);
+ }
+ }
+
+ if(output_schema_summary){
+ if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
+ exit(1);
+ }
+ }
+
+ if((qtree_output = fopen("qtree.xml","w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
+ exit(1);
+ }
+ fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
+ fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
+ fprintf(qtree_output,"<QueryNodes>\n");
+
+
+// Get an initial Schema
+ table_list *Schema;
+ if(!hfta_only){
+// Parse the table schema definitions.
+ fta_parse_result = new fta_parse_t();
+ FtaParser_setfileinput(table_schemas_in);
+ if(FtaParserparse()){
+ fprintf(stderr,"Table schema parse failed.\n");
+ exit(1);
+ }
+ if(fta_parse_result->parse_type != TABLE_PARSE){
+ fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
+ exit(1);
+ }
+ Schema = fta_parse_result->tables;
+
+// Ensure that all schema_ids, if set, are distinct.
+// Obsolete? There is code elsewhere to ensure that schema IDs are
+// distinct on a per-interface basis.
+/*
+ set<int> found_ids;
+ set<int> dup_ids;
+ for(int t=0;t<Schema->size();++t){
+ int sch_id = Schema->get_table(t)->get_schema_id();
+ if(sch_id> 0){
+ if(found_ids.find(sch_id) != found_ids.end()){
+ dup_ids.insert(sch_id);
+ }else{
+ found_ids.insert(sch_id);
+ }
+ }
+ if(dup_ids.size()>0){
+ fprintf(stderr, "Error, the schema has duplicate schema_ids:");
+ for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
+ fprintf(stderr," %d",(*dit));
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+ }
+*/
+
+
+// Process schema field inheritance
+ int retval;
+ retval = Schema->unroll_tables(err_str);
+ if(retval){
+ fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
+ exit(1);
+ }
+ }else{
+// hfta only => we will try to fetch schemas from the registry.
+// therefore, start off with an empty schema.
+ Schema = new table_list();
+ }
+
+
+// Open and parse the external functions file.
+ Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
+ if(Ext_fcnsParserin == NULL){
+ fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
+ Ext_fcns = new ext_fcn_list();
+ }else{
+ if(Ext_fcnsParserparse()){
+ fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
+ Ext_fcns = new ext_fcn_list();
+ }
+ }
+ if(Ext_fcns->validate_fcns(err_str)){
+ fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
+ exit(1);
+ }
+
+// Open and parse the interface resources file.
+// ifq_t *ifaces_db = new ifq_t();
+// string ierr;
+// if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
+// fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
+// ifx_fname.c_str(), ierr.c_str());
+// exit(1);
+// }
+// if(ifaces_db->load_ifqs(ifq_fname, ierr)){
+// fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
+// ifq_fname.c_str(), ierr.c_str());
+// exit(1);
+// }
+
+
+// The LFTA code string.
+// Put the standard preamble here.
+// NOTE: the hash macros, fcns should go into the run time
+ map<string, string> lfta_val;
+ map<string, string> lfta_prefilter_val;
+
+ string lfta_header =
+"#include <limits.h>\n"
+"#include \"rts.h\"\n"
+"#include \"fta.h\"\n"
+"#include \"lapp.h\"\n"
+"#include \"rts_udaf.h\"\n"
+"#include<stdio.h>\n"
+"#include <float.h>\n"
+"#include \"rdtsc.h\"\n"
+"#include \"watchlist.h\"\n\n"
+
+;
+// Get any locally defined parsing headers
+ glob_t glob_result;
+ memset(&glob_result, 0, sizeof(glob_result));
+
+ // do the glob operation TODO should be from GSROOT
+ int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
+ if(return_value == 0){
+ lfta_header += "\n";
+ for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
+ char *flds[1000];
+ int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
+ lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
+ }
+ lfta_header += "\n";
+ }else{
+ fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
+ }
+
+/*
+"#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
+"#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
+"#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
+"#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
+*/
+
+ lfta_header +=
+"\n"
+"gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
+"\n"
+"#define SLOT_FILLED 0x04\n"
+"#define SLOT_GEN_BITS 0x03\n"
+"#define SLOT_HASH_BITS 0xfffffff8\n"
+"#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
+"#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
+"#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
+"\n\n"
+
+"#define lfta_BOOL_to_hash(x) (x)\n"
+"#define lfta_USHORT_to_hash(x) (x)\n"
+"#define lfta_UINT_to_hash(x) (x)\n"
+"#define lfta_IP_to_hash(x) (x)\n"
+"#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
+"#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
+"#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
+"#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
+"#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
+"static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
+" gs_uint32_t i,ret=0,tmp_sum = 0;\n"
+" for(i=0;i<x.length;++i){\n"
+" tmp_sum |= (x.data[i]) << (8*(i%4));\n"
+" if((i%4) == 3){\n"
+" ret ^= tmp_sum;\n"
+" tmp_sum = 0;\n"
+" }\n"
+" }\n"
+" if((i%4)!=0) ret ^=tmp_sum;\n"
+" return(ret);\n"
+"}\n\n\n";
+
+
+
+//////////////////////////////////////////////////////////////////
+///// Get all of the query parse trees
+
+
+ int i,p;
+ int hfta_count = 0; // for numeric suffixes to hfta .cc files
+
+//---------------------------
+// Global info needed for post processing.
+
+// Set of operator views ref'd in the query set.
+ opview_set opviews;
+// lftas on a per-machine basis.
+ map<string, vector<stream_query *> > lfta_mach_lists;
+ int nfiles = input_file_names.size();
+ vector<stream_query *> hfta_list; // list of hftas.
+ map<string, stream_query *> sq_map; // map from query name to stream query.
+
+
+//////////////////////////////////////////
+
+// Open and parse the interface resources file.
+ ifq_t *ifaces_db = new ifq_t();
+ string ierr;
+ if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
+ fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
+ ifx_fname.c_str(), ierr.c_str());
+ exit(1);
+ }
+ if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
+ fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
+ ifq_fls[0].c_str(), ierr.c_str());
+ exit(1);
+ }
+
+ map<string, string> qname_to_flname; // for detecting duplicate query names
+
+
+
+// Parse the files to create a vector of parse trees.
+// Load qnodes with information to perform a topo sort
+// based on query dependencies.
+ vector<query_node *> qnodes; // for topo sort.
+ map<string,int> name_node_map; // map query name to qnodes entry
+ for(i=0;i<input_file_names.size();i++){
+
+ if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
+ fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
+ continue;
+ }
+fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
+
+// Parse the FTA query
+ fta_parse_result = new fta_parse_t();
+ FtaParser_setfileinput(fta_in);
+ if(FtaParserparse()){
+ fprintf(stderr,"FTA parse failed.\n");
+ exit(1);
+ }
+ if(fta_parse_result->parse_type != QUERY_PARSE){
+ fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
+ exit(1);
+ }
+
+// returns a list of parse trees
+ vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
+ for(p=0;p<qlist.size();++p){
+ table_exp_t *fta_parse_tree = qlist[p];
+// query_parse_trees.push_back(fta_parse_tree);
+
+// compute the default name -- extract from query name
+ strcpy(tmpstr,input_file_names[i].c_str());
+ char *qname = strrchr(tmpstr,PATH_DELIM);
+ if(qname == NULL)
+ qname = tmpstr;
+ else
+ qname++;
+ char *qname_end = strchr(qname,'.');
+ if(qname_end != NULL) *qname_end = '\0';
+ string qname_str = qname;
+ string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
+
+// Deternmine visibility. Should I be attaching all of the output methods?
+ if(qname_to_ospec.count(imputed_qname)>0)
+ fta_parse_tree->set_visible(true);
+ else
+ fta_parse_tree->set_visible(false);
+
+
+// Create a manipulable repesentation of the parse tree.
+// the qnode inherits the visibility assigned to the parse tree.
+ int pos = qnodes.size();
+ qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
+ name_node_map[ qnodes[pos]->name ] = pos;
+//printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
+// qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
+// qfiles.push_back(i);
+
+// Check for duplicate query names
+// NOTE : in hfta-only generation, I should
+// also check with the names of the registered queries.
+ if(qname_to_flname.count(qnodes[pos]->name) > 0){
+ fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
+ qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
+ exit(1);
+ }
+ if(Schema->find_tbl(qnodes[pos]->name) >= 0){
+ fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
+ qnodes[pos]->name.c_str(), input_file_names[i].c_str());
+ exit(1);
+ }
+ qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
+
+
+ }
+ }
+
+// Add the library queries
+
+ int pos;
+ for(pos=0;pos<qnodes.size();++pos){
+ int fi;
+ for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
+ string src_tbl = qnodes[pos]->refd_tbls[fi];
+ if(qname_to_flname.count(src_tbl) == 0){
+ int last_sep = src_tbl.find_last_of('/');
+ if(last_sep != string::npos){
+fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
+ string target_qname = src_tbl.substr(last_sep+1);
+ string qpathname = library_path + src_tbl + ".gsql";
+ if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
+ fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
+ exit(1);
+ fprintf(stderr,"After exit\n");
+ }
+fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
+// Parse the FTA query
+ fta_parse_result = new fta_parse_t();
+ FtaParser_setfileinput(fta_in);
+ if(FtaParserparse()){
+ fprintf(stderr,"FTA parse failed.\n");
+ exit(1);
+ }
+ if(fta_parse_result->parse_type != QUERY_PARSE){
+ fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
+ exit(1);
+ }
+
+ map<string, int> local_query_map;
+ vector<string> local_query_names;
+ vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
+ for(p=0;p<qlist.size();++p){
+ table_exp_t *fta_parse_tree = qlist[p];
+ fta_parse_tree->set_visible(false); // assumed to not produce output
+ string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
+ if(imputed_qname == target_qname)
+ imputed_qname = src_tbl;
+ if(local_query_map.count(imputed_qname)>0){
+ fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
+ exit(1);
+ }
+ local_query_map[ imputed_qname ] = p;
+ local_query_names.push_back(imputed_qname);
+ }
+
+ if(local_query_map.count(src_tbl)==0){
+ fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
+ exit(1);
+ }
+
+ vector<int> worklist;
+ set<int> added_queries;
+ vector<query_node *> new_qnodes;
+ worklist.push_back(local_query_map[target_qname]);
+ added_queries.insert(local_query_map[target_qname]);
+ int qq;
+ int qpos = qnodes.size();
+ for(qq=0;qq<worklist.size();++qq){
+ int q_id = worklist[qq];
+ query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
+ new_qnodes.push_back( new_qnode);
+ vector<string> refd_tbls = new_qnode->refd_tbls;
+ int ff;
+ for(ff = 0;ff<refd_tbls.size();++ff){
+ if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
+
+ if(name_node_map.count(refd_tbls[ff])>0){
+ fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
+ exit(1);
+ }else{
+ worklist.push_back(local_query_map[refd_tbls[ff]]);
+ }
+ }
+ }
+ }
+
+ for(qq=0;qq<new_qnodes.size();++qq){
+ int qpos = qnodes.size();
+ qnodes.push_back(new_qnodes[qq]);
+ name_node_map[qnodes[qpos]->name ] = qpos;
+ qname_to_flname[qnodes[qpos]->name ] = qpathname;
+ }
+ }
+ }
+ }
+ }
+
+
+
+
+
+
+
+
+//---------------------------------------
+
+
+// Add the UDOPS.
+
+ string udop_missing_sources;
+ for(i=0;i<qnodes.size();++i){
+ int fi;
+ for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
+ int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
+ if(sid >= 0){
+ if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
+ if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
+ int pos = qnodes.size();
+ qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
+ name_node_map[ qnodes[pos]->name ] = pos;
+ qnodes[pos]->is_externally_visible = false; // its visible
+ // Need to mark the source queries as visible.
+ int si;
+ string missing_sources = "";
+ for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
+ string src_tbl = qnodes[pos]->refd_tbls[si];
+ if(name_node_map.count(src_tbl)==0){
+ missing_sources += src_tbl + " ";
+ }
+ }
+ if(missing_sources != ""){
+ udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
+ }
+ }
+ }
+ }
+ }
+ }
+ if(udop_missing_sources != ""){
+ fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
+ exit(1);
+ }
+
+
+
+////////////////////////////////////////////////////////////////////
+/// Check parse trees to verify that some
+/// global properties are met :
+/// if q1 reads from q2, then
+/// q2 is processed before q1
+/// q1 can supply q2's parameters
+/// Verify there is no cycle in the reads-from graph.
+
+// Compute an order in which to process the
+// queries.
+
+// Start by building the reads-from lists.
+//
+
+ for(i=0;i<qnodes.size();++i){
+ int qi, fi;
+ vector<string> refd_tbls = qnodes[i]->refd_tbls;
+ for(fi = 0;fi<refd_tbls.size();++fi){
+ if(name_node_map.count(refd_tbls[fi])>0){
+//printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
+ (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
+ }
+ }
+ }
+
+
+// If one query reads the result of another,
+// check for parameter compatibility. Currently it must
+// be an exact match. I will move to requiring
+// containment after re-ordering, but will require
+// some analysis for code generation which is not
+// yet in place.
+//printf("There are %d query nodes.\n",qnodes.size());
+
+
+ for(i=0;i<qnodes.size();++i){
+ vector<var_pair_t *> target_params = qnodes[i]->params;
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
+ vector<var_pair_t *> source_params = qnodes[(*si)]->params;
+ if(target_params.size() != source_params.size()){
+ fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
+ exit(1);
+ }
+ int p;
+ for(p=0;p<target_params.size();++p){
+ if(! (target_params[p]->name == source_params[p]->name &&
+ target_params[p]->val == source_params[p]->val ) ){
+ fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
+ exit(1);
+ }
+ }
+ }
+ }
+
+
+// Find the roots.
+// Start by counting inedges.
+ for(i=0;i<qnodes.size();++i){
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
+ qnodes[(*si)]->n_consumers++;
+ }
+ }
+
+// The roots are the nodes with indegree zero.
+ set<int> roots;
+ for(i=0;i<qnodes.size();++i){
+ if(qnodes[i]->n_consumers == 0){
+ if(qnodes[i]->is_externally_visible == false){
+ fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
+ }
+ roots.insert(i);
+ }
+ }
+
+// Remove the parts of the subtree that produce no output.
+ set<int> valid_roots;
+ set<int> discarded_nodes;
+ set<int> candidates;
+ while(roots.size() >0){
+ for(si=roots.begin();si!=roots.end();++si){
+ if(qnodes[(*si)]->is_externally_visible){
+ valid_roots.insert((*si));
+ }else{
+ discarded_nodes.insert((*si));
+ set<int>::iterator sir;
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
+ qnodes[(*sir)]->n_consumers--;
+ if(qnodes[(*sir)]->n_consumers == 0)
+ candidates.insert( (*sir));
+ }
+ }
+ }
+ roots = candidates;
+ candidates.clear();
+ }
+ roots = valid_roots;
+ if(discarded_nodes.size()>0){
+ fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
+ int di = 0;
+ for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
+ if(di>0 && (di%8)==0) fprintf(stderr,"\n");
+ di++;
+ fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
+ }
+ fprintf(stderr,"\n");
+ }
+
+// Compute the sources_to set, ignoring discarded nodes.
+ for(i=0;i<qnodes.size();++i){
+ if(discarded_nodes.count(i)==0)
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
+ qnodes[(*si)]->sources_to.insert(i);
+ }
+ }
+
+
+// Find the nodes that are shared by multiple visible subtrees.
+// THe roots become inferred visible nodes.
+
+// Find the visible nodes.
+ vector<int> visible_nodes;
+ for(i=0;i<qnodes.size();i++){
+ if(qnodes[i]->is_externally_visible){
+ visible_nodes.push_back(i);
+ }
+ }
+
+// Find UDOPs referenced by visible nodes.
+ list<int> workq;
+ for(i=0;i<visible_nodes.size();++i){
+ workq.push_back(visible_nodes[i]);
+ }
+ while(!workq.empty()){
+ int node = workq.front();
+ workq.pop_front();
+ set<int>::iterator children;
+ if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
+ qnodes[node]->is_externally_visible = true;
+ visible_nodes.push_back(node);
+ for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
+ if(qnodes[(*children)]->is_externally_visible == false){
+ qnodes[(*children)]->is_externally_visible = true;
+ visible_nodes.push_back((*children));
+ }
+ }
+ }
+ for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
+ workq.push_back((*children));
+ }
+ }
+
+ bool done = false;
+ while(!done){
+// reset the nodes
+ for(i=0;i<qnodes.size();i++){
+ qnodes[i]->subtree_roots.clear();
+ }
+
+// Walk the tree defined by a visible node, not descending into
+// subtrees rooted by a visible node. Mark the node visited with
+// the visible node ID.
+ for(i=0;i<visible_nodes.size();++i){
+ set<int> vroots;
+ vroots.insert(visible_nodes[i]);
+ while(vroots.size()>0){
+ for(si=vroots.begin();si!=vroots.end();++si){
+ qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
+
+ set<int>::iterator sir;
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
+ if(! qnodes[(*sir)]->is_externally_visible){
+ candidates.insert( (*sir));
+ }
+ }
+ }
+ vroots = candidates;
+ candidates.clear();
+ }
+ }
+// Find the nodes in multiple visible node subtrees, but with no parent
+// that has is in multile visible node subtrees. Mark these as inferred visible nodes.
+ done = true; // until proven otherwise
+ for(i=0;i<qnodes.size();i++){
+ if(qnodes[i]->subtree_roots.size()>1){
+ bool is_new_root = true;
+ set<int>::iterator sir;
+ for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
+ if(qnodes[(*sir)]->subtree_roots.size()>1)
+ is_new_root = false;
+ }
+ if(is_new_root){
+ qnodes[i]->is_externally_visible = true;
+ qnodes[i]->inferred_visible_node = true;
+ visible_nodes.push_back(i);
+ done = false;
+ }
+ }
+ }
+ }
+
+
+
+
+
+// get visible nodes in topo ordering.
+// for(i=0;i<qnodes.size();i++){
+// qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
+// }
+ vector<int> process_order;
+ while(roots.size() >0){
+ for(si=roots.begin();si!=roots.end();++si){
+ if(discarded_nodes.count((*si))==0){
+ process_order.push_back( (*si) );
+ }
+ set<int>::iterator sir;
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
+ qnodes[(*sir)]->n_consumers--;
+ if(qnodes[(*sir)]->n_consumers == 0)
+ candidates.insert( (*sir));
+ }
+ }
+ roots = candidates;
+ candidates.clear();
+ }
+
+
+//printf("process_order.size() =%d\n",process_order.size());
+
+// Search for cyclic dependencies
+ string found_dep;
+ for(i=0;i<qnodes.size();++i){
+ if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
+ if(found_dep.size() != 0) found_dep += ", ";
+ found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
+ }
+ }
+ if(found_dep.size()>0){
+ fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
+ exit(1);
+ }
+
+// Get a list of query sets, in the order to be processed.
+// Start at visible root and do bfs.
+// The query set includes queries referenced indirectly,
+// as sources for user-defined operators. These are needed
+// to ensure that they are added to the schema, but are not part
+// of the query tree.
+
+// stream_node_sets contains queries reachable only through the
+// FROM clause, so I can tell which queries to add to the stream
+// query. (DISABLED, UDOPS are integrated, does this cause problems?)
+
+// NOTE: this code works because in order for data to be
+// read by multiple hftas, the node must be externally visible.
+// But visible nodes define roots of process sets.
+// internally visible nodes can feed data only
+// to other nodes in the same query file.
+// Therefore, any access can be restricted to a file,
+// hfta output sharing is done only on roots
+// never on interior nodes.
+
+
+
+
+// Conpute the base collection of hftas.
+ vector<hfta_node *> hfta_sets;
+ map<string, int> hfta_name_map;
+// vector< vector<int> > process_sets;
+// vector< set<int> > stream_node_sets;
+ reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
+ // i.e. process leaves 1st.
+ for(i=0;i<process_order.size();++i){
+ if(qnodes[process_order[i]]->is_externally_visible == true){
+//printf("Visible.\n");
+ int root = process_order[i];
+ hfta_node *hnode = new hfta_node();
+ hnode->name = qnodes[root]-> name;
+ hnode->source_name = qnodes[root]-> name;
+ hnode->is_udop = qnodes[root]->is_udop;
+ hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
+
+ vector<int> proc_list; proc_list.push_back(root);
+// Ensure that nodes are added only once.
+ set<int> proc_set; proc_set.insert(root);
+ roots.clear(); roots.insert(root);
+ candidates.clear();
+ while(roots.size()>0){
+ for(si=roots.begin();si!=roots.end();++si){
+//printf("Processing root %d\n",(*si));
+ set<int>::iterator sir;
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
+//printf("reads fom %d\n",(*sir));
+ if(qnodes[(*sir)]->is_externally_visible==false){
+ candidates.insert( (*sir) );
+ if(proc_set.count( (*sir) )==0){
+ proc_set.insert( (*sir) );
+ proc_list.push_back( (*sir) );
+ }
+ }
+ }
+ }
+ roots = candidates;
+ candidates.clear();
+ }
+
+ reverse(proc_list.begin(), proc_list.end());
+ hnode->query_node_indices = proc_list;
+ hfta_name_map[hnode->name] = hfta_sets.size();
+ hfta_sets.push_back(hnode);
+ }
+ }
+
+// Compute the reads_from / sources_to graphs for the hftas.
+
+ for(i=0;i<hfta_sets.size();++i){
+ hfta_node *hnode = hfta_sets[i];
+ for(q=0;q<hnode->query_node_indices.size();q++){
+ query_node *qnode = qnodes[hnode->query_node_indices[q]];
+ for(s=0;s<qnode->refd_tbls.size();++s){
+ if(hfta_name_map.count(qnode->refd_tbls[s])){
+ int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
+ hnode->reads_from.insert(other_hfta);
+ hfta_sets[other_hfta]->sources_to.insert(i);
+ }
+ }
+ }
+ }
+
+// Compute a topological sort of the hfta_sets.
+
+ vector<int> hfta_topsort;
+ workq.clear();
+ int hnode_srcs[hfta_sets.size()];
+ for(i=0;i<hfta_sets.size();++i){
+ hnode_srcs[i] = 0;
+ if(hfta_sets[i]->sources_to.size() == 0)
+ workq.push_back(i);
+ }
+
+ while(! workq.empty()){
+ int node = workq.front();
+ workq.pop_front();
+ hfta_topsort.push_back(node);
+ set<int>::iterator stsi;
+ for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
+ int parent = (*stsi);
+ hnode_srcs[parent]++;
+ if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
+ workq.push_back(parent);
+ }
+ }
+ }
+
+// Decorate hfta nodes with the level of parallelism given as input.
+
+ map<string, int>::iterator msii;
+ for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
+ string hfta_name = (*msii).first;
+ int par = (*msii).second;
+ if(hfta_name_map.count(hfta_name) > 0){
+ hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
+ }else{
+ fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
+ }
+ }
+
+// Propagate levels of parallelism: children should have a level of parallelism
+// as large as any of its parents. Adjust children upwards to compensate.
+// Start at parents and adjust children, auto-propagation will occur.
+
+ for(i=hfta_sets.size()-1;i>=0;i--){
+ set<int>::iterator stsi;
+ for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
+ if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
+ hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
+ }
+ }
+ }
+
+// Before all the name mangling, check if therey are any output_spec.cfg
+// or hfta_parallelism.cfg entries that do not have a matching query.
+
+ string dangling_ospecs = "";
+ for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
+ string oq = (*msii).first;
+ if(hfta_name_map.count(oq) == 0){
+ dangling_ospecs += " "+(*msii).first;
+ }
+ }
+ if(dangling_ospecs!=""){
+ fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+ exit(1);
+ }
+
+ string dangling_par = "";
+ for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
+ string oq = (*msii).first;
+ if(hfta_name_map.count(oq) == 0){
+ dangling_par += " "+(*msii).first;
+ }
+ }
+ if(dangling_par!=""){
+ fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
+ }
+
+
+
+// Replicate parallelized hftas. Do __copyX name mangling. Adjust
+// FROM clauses: retarget any name which is an internal node, and
+// any which is in hfta_sets (and which is parallelized). Add Merge nodes
+// when the source hfta has more parallelism than the target node.
+// Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
+
+
+ int n_original_hfta_sets = hfta_sets.size();
+ for(i=0;i<n_original_hfta_sets;++i){
+ if(hfta_sets[i]->n_parallel > 1){
+ hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
+ set<string> local_nodes; // names of query nodes in the hfta.
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
+ local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
+ }
+
+ for(p=0;p<hfta_sets[i]->n_parallel;++p){
+ string mangler = "__copy"+int_to_string(p);
+ hfta_node *par_hfta = new hfta_node();
+ par_hfta->name = hfta_sets[i]->name + mangler;
+ par_hfta->source_name = hfta_sets[i]->name;
+ par_hfta->is_udop = hfta_sets[i]->is_udop;
+ par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
+ par_hfta->n_parallel = hfta_sets[i]->n_parallel;
+ par_hfta->parallel_idx = p;
+
+ map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
+
+// Is it a UDOP?
+ if(hfta_sets[i]->is_udop){
+ int root = hfta_sets[i]->query_node_indices[0];
+
+ string unequal_par_sources;
+ set<int>::iterator rfsii;
+ for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
+ if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
+ unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
+ }
+ }
+ if(unequal_par_sources != ""){
+ fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
+ exit(1);
+ }
+
+ int rti;
+ vector<string> new_sources;
+ for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
+ new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
+ }
+
+ query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
+ new_qn->name += mangler;
+ new_qn->mangler = mangler;
+ new_qn->refd_tbls = new_sources;
+ par_hfta->query_node_indices.push_back(qnodes.size());
+ par_qnode_map[new_qn->name] = qnodes.size();
+ name_node_map[ new_qn->name ] = qnodes.size();
+ qnodes.push_back(new_qn);
+ }else{
+// regular query node
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
+ int hqn_idx = hfta_sets[i]->query_node_indices[h];
+ table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
+// rehome the from clause on mangled names.
+// create merge nodes as needed for external sources.
+ for(f=0;f<dup_pt->fm->tlist.size();++f){
+ if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
+ dup_pt->fm->tlist[f]->schema_name += mangler;
+ }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
+// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
+ int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
+ if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
+ dup_pt->fm->tlist[f]->schema_name += mangler;
+ }else{
+ vector<string> src_tbls;
+ int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
+ if(stride == 0){
+ fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
+ exit(1);
+ }
+ for(s=0;s<stride;++s){
+ string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
+ src_tbls.push_back(ext_src_name);
+ }
+ table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
+ string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
+ dup_pt->fm->tlist[f]->schema_name = merge_node_name;
+// Make a qnode to represent the new merge node
+ query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
+ qn_pt->refd_tbls = src_tbls;
+ qn_pt->is_udop = false;
+ qn_pt->is_externally_visible = false;
+ qn_pt->inferred_visible_node = false;
+ par_hfta->query_node_indices.push_back(qnodes.size());
+ par_qnode_map[merge_node_name] = qnodes.size();
+ name_node_map[ merge_node_name ] = qnodes.size();
+ qnodes.push_back(qn_pt);
+ }
+ }
+ }
+ query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
+ for(f=0;f<dup_pt->fm->tlist.size();++f){
+ new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
+ }
+ new_qn->params = qnodes[hqn_idx]->params;
+ new_qn->is_udop = false;
+ new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
+ new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
+ par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
+ par_qnode_map[new_qn->name] = qnodes.size();
+ name_node_map[ new_qn->name ] = qnodes.size();
+ qnodes.push_back(new_qn);
+ }
+ }
+ hfta_name_map[par_hfta->name] = hfta_sets.size();
+ hfta_sets.push_back(par_hfta);
+ }
+ }else{
+// This hfta isn't being parallelized, but add merge nodes for any parallelized
+// hfta sources.
+ if(!hfta_sets[i]->is_udop){
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
+ int hqn_idx = hfta_sets[i]->query_node_indices[h];
+ for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
+ if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
+// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
+ int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
+ if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
+ vector<string> src_tbls;
+ for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
+ string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
+ src_tbls.push_back(ext_src_name);
+ }
+ table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
+ string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
+ qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
+// Make a qnode to represent the new merge node
+ query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
+ qn_pt->refd_tbls = src_tbls;
+ qn_pt->is_udop = false;
+ qn_pt->is_externally_visible = false;
+ qn_pt->inferred_visible_node = false;
+ hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
+ name_node_map[ merge_node_name ] = qnodes.size();
+ qnodes.push_back(qn_pt);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+// Rebuild the reads_from / sources_to lists in the qnodes
+ for(q=0;q<qnodes.size();++q){
+ qnodes[q]->reads_from.clear();
+ qnodes[q]->sources_to.clear();
+ }
+ for(q=0;q<qnodes.size();++q){
+ for(s=0;s<qnodes[q]->refd_tbls.size();++s){
+ if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
+ int rf = name_node_map[qnodes[q]->refd_tbls[s]];
+ qnodes[q]->reads_from.insert(rf);
+ qnodes[rf]->sources_to.insert(q);
+ }
+ }
+ }
+
+// Rebuild the reads_from / sources_to lists in hfta_sets
+ for(q=0;q<hfta_sets.size();++q){
+ hfta_sets[q]->reads_from.clear();
+ hfta_sets[q]->sources_to.clear();
+ }
+ for(q=0;q<hfta_sets.size();++q){
+ for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
+ int node = hfta_sets[q]->query_node_indices[s];
+ set<int>::iterator rfsii;
+ for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
+ if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
+ hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
+ hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
+ }
+ }
+ }
+ }
+
+/*
+for(q=0;q<qnodes.size();++q){
+ printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
+ set<int>::iterator rsii;
+ for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
+ printf(" %d",(*rsii));
+ printf(", and sources-to %d:",qnodes[q]->sources_to.size());
+ for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
+ printf(" %d",(*rsii));
+ printf("\n");
+}
+
+for(q=0;q<hfta_sets.size();++q){
+ if(hfta_sets[q]->do_generation==false)
+ continue;
+ printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
+ set<int>::iterator rsii;
+ for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
+ printf(" %d",(*rsii));
+ printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
+ for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
+ printf(" %d",(*rsii));
+ printf("\n");
+}
+*/
+
+
+
+// Re-topo sort the hftas
+ hfta_topsort.clear();
+ workq.clear();
+ int hnode_srcs_2[hfta_sets.size()];
+ for(i=0;i<hfta_sets.size();++i){
+ hnode_srcs_2[i] = 0;
+ if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
+ workq.push_back(i);
+ }
+ }
+
+ while(workq.empty() == false){
+ int node = workq.front();
+ workq.pop_front();
+ hfta_topsort.push_back(node);
+ set<int>::iterator stsii;
+ for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
+ int child = (*stsii);
+ hnode_srcs_2[child]++;
+ if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
+ workq.push_back(child);
+ }
+ }
+ }
+
+// Ensure that all of the query_node_indices in hfta_sets are topologically
+// sorted, don't rely on assumptions that all transforms maintain some kind of order.
+ for(i=0;i<hfta_sets.size();++i){
+ if(hfta_sets[i]->do_generation){
+ map<int,int> n_accounted;
+ vector<int> new_order;
+ workq.clear();
+ vector<int>::iterator vii;
+ for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
+ n_accounted[(*vii)]= 0;
+ }
+ for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
+ set<int>::iterator rfsii;
+ for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
+ if(n_accounted.count((*rfsii)) == 0){
+ n_accounted[(*vii)]++;
+ }
+ }
+ if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
+ workq.push_back((*vii));
+ }
+ }
+
+ while(workq.empty() == false){
+ int node = workq.front();
+ workq.pop_front();
+ new_order.push_back(node);
+ set<int>::iterator stsii;
+ for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
+ if(n_accounted.count((*stsii))){
+ n_accounted[(*stsii)]++;
+ if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
+ workq.push_back((*stsii));
+ }
+ }
+ }
+ }
+ hfta_sets[i]->query_node_indices = new_order;
+ }
+ }
+
+
+
+
+
+/// Global checkng is done, start the analysis and translation
+/// of the query parse tree in the order specified by process_order
+
+
+// Get a list of the LFTAs for global lfta optimization
+// TODO: separate building operators from spliting lftas,
+// that will make optimizations such as predicate pushing easier.
+ vector<stream_query *> lfta_list;
+ stream_query *rootq;
+ int qi,qj;
+
+ map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
+
+ for(qi=hfta_topsort.size()-1;qi>=0;--qi){
+
+ int hfta_id = hfta_topsort[qi];
+ vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
+
+
+
+// Two possibilities, either its a UDOP, or its a collection of queries.
+// if(qnodes[curr_list.back()]->is_udop)
+ if(hfta_sets[hfta_id]->is_udop){
+ int node_id = curr_list.back();
+ int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
+ opview_entry *opv = new opview_entry();
+
+// Many of the UDOP properties aren't currently used.
+ opv->parent_qname = "no_parent";
+ opv->root_name = qnodes[node_id]->name;
+ opv->view_name = qnodes[node_id]->file;
+ opv->pos = qi;
+ sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
+ opv->udop_alias = tmpstr;
+ opv->mangler = qnodes[node_id]->mangler;
+
+ if(opv->mangler != ""){
+ int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
+ Schema->mangle_subq_names(new_udop_schref,opv->mangler);
+ }
+
+// This piece of code makes each hfta which referes to the same udop
+// reference a distinct running udop. Do this at query optimization time?
+// fmtbl->set_udop_alias(opv->udop_alias);
+
+ opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
+ opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
+
+ vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
+ int s,f,q;
+ for(s=0;s<subq.size();++s){
+// Validate that the fields match.
+ subquery_spec *sqs = subq[s];
+ string subq_name = sqs->name + opv->mangler;
+ vector<field_entry *> flds = Schema->get_fields(subq_name);
+ if(flds.size() == 0){
+ fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
+ return(1);
+ }
+ if(flds.size() < sqs->types.size()){
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
+ return(1);
+ }
+ bool failed = false;
+ for(f=0;f<sqs->types.size();++f){
+ data_type dte(sqs->types[f],sqs->modifiers[f]);
+ data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
+ if(! dte.subsumes_type(&dtf) ){
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
+ failed = true;
+ }
+/*
+ if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
+ string pstr = dte.get_temporal_string();
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
+ failed = true;
+ }
+*/
+ }
+ if(failed)
+ return(1);
+/// Validation done, find the subquery, make a copy of the
+/// parse tree, and add it to the return list.
+ for(q=0;q<qnodes.size();++q)
+ if(qnodes[q]->name == subq_name)
+ break;
+ if(q==qnodes.size()){
+ fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
+ return(1);
+ }
+
+ }
+
+// Cross-link to from entry(s) in all sourced-to tables.
+ set<int>::iterator sii;
+ for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
+//printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
+ vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
+ int ii;
+ for(ii=0;ii<tblvars.size();++ii){
+ if(tblvars[ii]->schema_name == opv->root_name){
+ tblvars[ii]->set_opview_idx(opviews.size());
+ }
+
+ }
+ }
+
+ opviews.append(opv);
+ }else{
+
+// Analyze the parse trees in this query,
+// put them in rootq
+// vector<int> curr_list = process_sets[qi];
+
+
+////////////////////////////////////////
+
+ rootq = NULL;
+//printf("Process set %d, has %d queries\n",qi,curr_list.size());
+ for(qj=0;qj<curr_list.size();++qj){
+ i = curr_list[qj];
+ fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
+
+// Select the current query parse tree
+ table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
+
+// if hfta only, try to fetch any missing schemas
+// from the registry (using the print_schema program).
+// Here I use a hack to avoid analyzing the query -- all referenced
+// tables must be in the from clause
+// If there is a problem loading any table, just issue a warning,
+//
+ tablevar_list_t *fm = fta_parse_tree->get_from();
+ vector<string> refd_tbls = fm->get_src_tbls(Schema);
+// iterate over all referenced tables
+ int t;
+ for(t=0;t<refd_tbls.size();++t){
+ int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
+
+ if(tbl_ref < 0){ // if this table is not in the Schema
+
+ if(hfta_only){
+ string cmd="print_schema "+refd_tbls[t];
+ FILE *schema_in = popen(cmd.c_str(), "r");
+ if(schema_in == NULL){
+ fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
+ }else{
+ string schema_instr;
+ while(fgets(tmpstr,TMPSTRLEN,schema_in)){
+ schema_instr += tmpstr;
+ }
+ fta_parse_result = new fta_parse_t();
+ strcpy(tmp_schema_str,schema_instr.c_str());
+ FtaParser_setstringinput(tmp_schema_str);
+ if(FtaParserparse()){
+ fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
+ }else{
+ if( fta_parse_result->tables != NULL){
+ int tl;
+ for(tl=0;tl<fta_parse_result->tables->size();++tl){
+ Schema->add_table(fta_parse_result->tables->get_table(tl));
+ }
+ }else{
+ fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
+ }
+ }
+ }
+ }else{
+ fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
+ exit(1);
+ }
+
+ }
+ }
+
+
+// Analyze the query.
+ query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
+ if(qs == NULL){
+ fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
+ exit(1);
+ }
+
+ stream_query new_sq(qs, Schema);
+ if(new_sq.error_code){
+ fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
+ exit(1);
+ }
+
+// Add it to the Schema
+ table_def *output_td = new_sq.get_output_tabledef();
+ Schema->add_table(output_td);
+
+// Create a query plan from the analyzed parse tree.
+// If its a query referneced via FROM, add it to the stream query.
+ if(rootq){
+ rootq->add_query(new_sq);
+ }else{
+ rootq = new stream_query(new_sq);
+// have the stream query object inherit properties form the analyzed
+// hfta_node object.
+ rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
+ rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
+ }
+
+
+ }
+
+// This stream query has all its parts
+// Build and optimize it.
+//printf("translate_fta: generating plan.\n");
+ if(rootq->generate_plan(Schema)){
+ fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
+ continue;
+ }
+
+// If we've found the query plan head, so now add the output operators
+ if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
+ pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
+ multimap<string, int>::iterator mmsi;
+ oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
+ for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
+ rootq->add_output_operator(output_specs[(*mmsi).second]);
+ }
+ }
+
+
+
+// Perform query splitting if necessary.
+ bool hfta_returned;
+ vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
+
+ int l;
+//for(l=0;l<split_queries.size();++l){
+//printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
+//}
+
+
+
+
+ if(split_queries.size() > 0){ // should be at least one component.
+
+// Compute the number of LFTAs.
+ int n_lfta = split_queries.size();
+ if(hfta_returned) n_lfta--;
+// Check if a schemaId constraint needs to be inserted.
+
+// Process the LFTA components.
+ for(l=0;l<n_lfta;++l){
+ if(lfta_names.count(split_queries[l]->query_name) == 0){
+// Grab the lfta for global optimization.
+ vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
+ string liface = "_local_";
+// string lmach = "";
+ string lmach = hostname;
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface(); // iface queries have been resolved
+ if(tvec[0]->get_machine() != ""){
+ lmach = tvec[0]->get_machine();
+ }else{
+ fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+ }
+ } // else{
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
+// }
+
+ vector<predicate_t *> schemaid_preds;
+ for(int irv=0;irv<tvec.size();++irv){
+
+ string schema_name = tvec[irv]->get_schema_name();
+ string rvar_name = tvec[irv]->get_var_name();
+ int schema_ref = tvec[irv]->get_schema_ref();
+ if (lmach == "")
+ lmach = hostname;
+// interface_names.push_back(liface);
+// machine_names.push_back(lmach);
+
+//printf("Machine is %s\n",lmach.c_str());
+
+// Check if a schemaId constraint needs to be inserted.
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
+ }
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+ exit(1);
+ }
+
+
+ if(tvec[irv]->get_interface() != "_local_"){
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+// Ensure that in liface, schema_id is used for only one schema
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
+ }else{
+ schema_id = -1;
+ }
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+// TODO not just schema_id, the full all_schema_ids set.
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+// cnf built, now insert it.
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing
+// filter join, get two schemaid preds
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+// watchlist join, get the first schemaid pred
+ if(node_type == "watch_join"){
+ watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>1){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
+ }
+
+
+
+
+
+
+
+// Set the ht size from the recommendation, if there is one in the rec file
+ if(lfta_htsize.count(split_queries[l]->query_name)>0){
+ split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
+ }
+
+
+ lfta_names[split_queries[l]->query_name] = lfta_list.size();
+ split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
+ lfta_list.push_back(split_queries[l]);
+ lfta_mach_lists[lmach].push_back(split_queries[l]);
+
+// THe following is a hack,
+// as I should be generating LFTA code through
+// the stream_query object.
+
+ split_queries[l]->query_plan[0]->bind_to_schema(Schema);
+
+// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
+
+/*
+// Create query description to embed in lfta.c
+ string lfta_schema_str = split_queries[l]->make_schema();
+ string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
+
+// get NIC capabilities.
+ int erri;
+ nic_property *nicprop = NULL;
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
+ if(iface_codegen_type.size()){
+ nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
+ if(!nicprop){
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
+ exit(1);
+ }
+ }
+
+ lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
+*/
+
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
+
+// TODO NOTE : I'd like it to be the case that registration_query_names
+// are the queries to be registered for subsciption.
+// but there is some complex bookkeeping here.
+ registration_query_names.push_back(split_queries[l]->query_name);
+ mach_query_names[lmach].push_back(registration_query_names.size()-1);
+// NOTE: I will assume a 1-1 correspondance between
+// mach_query_names[lmach] and lfta_mach_lists[lmach]
+// where mach_query_names[lmach][i] contains the index into
+// query_names, which names the lfta, and
+// mach_query_names[lmach][i] is the stream_query * of the
+// corresponding lfta.
+// Later, lfta_iface_qnames are the query names matching lfta_iface_lists
+
+
+
+ // check if lfta is reusable
+ // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
+
+ bool lfta_reusable = false;
+ if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
+ split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
+ lfta_reusable = true;
+ }
+ lfta_reuse_options.push_back(lfta_reusable);
+
+ // LFTA will inherit the liveness timeout specification from the containing query
+ // it is too conservative as lfta are expected to spend less time per tuple
+ // then full query
+
+ // extract liveness timeout from query definition
+ int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
+ if (!liveness_timeout) {
+// fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
+// split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
+ liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
+ }
+ lfta_liveness_timeouts.push_back(liveness_timeout);
+
+// Add it to the schema
+ table_def *td = split_queries[l]->get_output_tabledef();
+ Schema->append_table(td);
+//printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
+
+ }
+ }
+
+// If the output is lfta-only, dump out the query name.
+ if(split_queries.size() == 1 && !hfta_returned){
+ if(output_query_names ){
+ fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
+ }
+/*
+else{
+ fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
+ }
+*/
+
+/*
+// output schema summary
+ if(output_schema_summary){
+ dump_summary(split_queries[0]);
+ }
+*/
+ }
+
+
+ if(hfta_returned){ // query also has an HFTA component
+ int hfta_nbr = split_queries.size()-1;
+
+ hfta_list.push_back(split_queries[hfta_nbr]);
+
+// report on generated query names
+ if(output_query_names){
+ string hfta_name =split_queries[hfta_nbr]->query_name;
+ fprintf(query_name_output,"%s H\n",hfta_name.c_str());
+ for(l=0;l<hfta_nbr;++l){
+ string lfta_name =split_queries[l]->query_name;
+ fprintf(query_name_output,"%s L\n",lfta_name.c_str());
+ }
+ }
+// else{
+// fprintf(stderr,"query names are ");
+// for(l=0;l<hfta_nbr;++l){
+// if(l>0) fprintf(stderr,",");
+// string fta_name =split_queries[l]->query_name;
+// fprintf(stderr," %s",fta_name.c_str());
+// }
+// fprintf(stderr,"\n");
+// }
+ }
+
+ }else{
+ fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
+ fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
+ exit(1);
+ }
+ }
+}
+
+
+//-----------------------------------------------------------------
+// Compute and propagate the SE in PROTOCOL fields compute a field.
+//-----------------------------------------------------------------
+
+for(i=0;i<lfta_list.size();i++){
+ lfta_list[i]->generate_protocol_se(sq_map, Schema);
+ sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
+}
+for(i=0;i<hfta_list.size();i++){
+ hfta_list[i]->generate_protocol_se(sq_map, Schema);
+ sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
+}
+
+
+
+//------------------------------------------------------------------------
+// Perform individual FTA optimizations
+//-----------------------------------------------------------------------
+
+if (partitioned_mode) {
+
+ // open partition definition file
+ string part_fname = config_dir_path + "partition.txt";
+
+ FILE* partfd = fopen(part_fname.c_str(), "r");
+ if (!partfd) {
+ fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
+ exit(1);
+ }
+ PartnParser_setfileinput(partfd);
+ if (PartnParserparse()) {
+ fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
+ exit(1);
+ }
+ fclose(partfd);
+}
+
+
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
+
+int num_hfta = hfta_list.size();
+for(i=0; i < hfta_list.size(); ++i){
+ hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
+}
+
+// Add all new hftas to schema
+for(i=num_hfta; i < hfta_list.size(); ++i){
+ table_def *td = hfta_list[i]->get_output_tabledef();
+ Schema->append_table(td);
+}
+
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
+
+
+
+//------------------------------------------------------------------------
+// Do global (cross-fta) optimization
+//-----------------------------------------------------------------------
+
+
+
+
+
+
+set<string> extra_external_libs;
+
+for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
+
+ if(! debug_only){
+// build hfta file name, create output
+ if(numeric_hfta_flname){
+ sprintf(tmpstr,"hfta_%d",hfta_count);
+ hfta_names.push_back(tmpstr);
+ sprintf(tmpstr,"hfta_%d.cc",hfta_count);
+ }else{
+ sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
+ hfta_names.push_back(tmpstr);
+ sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
+ }
+ FILE *hfta_fl = fopen(tmpstr,"w");
+ if(hfta_fl == NULL){
+ fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
+ exit(1);
+ }
+ fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
+
+// If there is a field verifier, warn about
+// lack of compatability
+// NOTE : this code assumes that visible non-lfta queries
+// are those at the root of a stream query.
+ string hfta_comment;
+ string hfta_title;
+ string hfta_namespace;
+ if(hfta_list[i]->defines.count("comment")>0)
+ hfta_comment = hfta_list[i]->defines["comment"];
+ if(hfta_list[i]->defines.count("Comment")>0)
+ hfta_comment = hfta_list[i]->defines["Comment"];
+ if(hfta_list[i]->defines.count("COMMENT")>0)
+ hfta_comment = hfta_list[i]->defines["COMMENT"];
+ if(hfta_list[i]->defines.count("title")>0)
+ hfta_title = hfta_list[i]->defines["title"];
+ if(hfta_list[i]->defines.count("Title")>0)
+ hfta_title = hfta_list[i]->defines["Title"];
+ if(hfta_list[i]->defines.count("TITLE")>0)
+ hfta_title = hfta_list[i]->defines["TITLE"];
+ if(hfta_list[i]->defines.count("namespace")>0)
+ hfta_namespace = hfta_list[i]->defines["namespace"];
+ if(hfta_list[i]->defines.count("Namespace")>0)
+ hfta_namespace = hfta_list[i]->defines["Namespace"];
+ if(hfta_list[i]->defines.count("NAMESPACE")>0)
+ hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
+
+ if(field_verifier != NULL){
+ string warning_str;
+ if(hfta_comment == "")
+ warning_str += "\tcomment not found.\n";
+
+// Obsolete stuff that Carsten wanted
+// if(hfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(hfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
+
+// There is a get_tbl_keys method implemented for qp_nodes,
+// integrate it into steam_query, then call it to find keys,
+// and annotate feidls with their key-ness.
+// If there is a "keys" proprty in the defines block, override anything returned
+// from the automated analysis
+
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+ int fi;
+ for(fi=0;fi<flds.size();fi++){
+ field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
+ }
+ if(warning_str != "")
+ fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
+ hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
+ }
+
+// Get the fields in this query
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+
+// do key processing
+ string hfta_keys_s = "";
+ if(hfta_list[i]->defines.count("keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["keys"];
+ if(hfta_list[i]->defines.count("Keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["Keys"];
+ if(hfta_list[i]->defines.count("KEYS")>0)
+ hfta_keys_s = hfta_list[i]->defines["KEYS"];
+ string xtra_keys_s = "";
+ if(hfta_list[i]->defines.count("extra_keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["extra_keys"];
+ if(hfta_list[i]->defines.count("Extra_Keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
+ if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
+ xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
+// get the keys
+ vector<string> hfta_keys;
+ vector<string> partial_keys;
+ vector<string> xtra_keys;
+ if(hfta_keys_s==""){
+ hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
+ if(xtra_keys_s.size()>0){
+ xtra_keys = split_string(xtra_keys_s, ',');
+ }
+ for(int xi=0;xi<xtra_keys.size();++xi){
+ if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
+ hfta_keys.push_back(xtra_keys[xi]);
+ }
+ }
+ }else{
+ hfta_keys = split_string(hfta_keys_s, ',');
+ }
+// validate that all of the keys exist in the output.
+// (exit on error, as its a bad specificiation)
+ vector<string> missing_keys;
+ for(int ki=0;ki<hfta_keys.size(); ++ki){
+ int fi;
+ for(fi=0;fi<flds.size();++fi){
+ if(hfta_keys[ki] == flds[fi]->get_name())
+ break;
+ }
+ if(fi==flds.size())
+ missing_keys.push_back(hfta_keys[ki]);
+ }
+ if(missing_keys.size()>0){
+ fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ for(int hi=0; hi<missing_keys.size(); ++hi){
+ fprintf(stderr," %s", missing_keys[hi].c_str());
+ }
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+
+ fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ if(hfta_comment != "")
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
+ if(hfta_title != "")
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
+ if(hfta_namespace != "")
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
+ fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");
+
+// write info about fields to qtree.xml
+ int fi;
+ for(fi=0;fi<flds.size();fi++){
+ fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
+ if(flds[fi]->get_modifier_list()->size()){
+ fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
+ }
+ fprintf(qtree_output," />\n");
+ }
+// info about keys
+ for(int hi=0;hi<hfta_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
+ }
+ for(int hi=0;hi<partial_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
+ }
+ for(int hi=0;hi<xtra_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
+ }
+
+
+ // extract liveness timeout from query definition
+ int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
+ if (!liveness_timeout) {
+// fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
+// hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
+ liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
+ }
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
+
+ vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
+ int itv;
+ for(itv=0;itv<tmp_tv.size();++itv){
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
+ }
+ string ifrs = hfta_list[i]->collect_refd_ifaces();
+ if(ifrs != ""){
+ fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
+ }
+ fprintf(qtree_output,"\t</HFTA>\n");
+
+ fclose(hfta_fl);
+ }else{
+// debug only -- do code generation to catch generation-time errors.
+ hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
+ }
+
+ hfta_count++; // for hfta file names with numeric suffixes
+
+ hfta_list[i]->get_external_libs(extra_external_libs);
+
+ }
+
+string ext_lib_string;
+set<string>::iterator ssi_el;
+for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
+ ext_lib_string += (*ssi_el)+" ";
+
+
+
+// Report on the set of operator views
+ for(i=0;i<opviews.size();++i){
+ opview_entry *opve = opviews.get_entry(i);
+ fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
+ fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
+ fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
+ fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");
+
+ if (!opve->liveness_timeout) {
+// fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
+// opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
+ opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
+ }
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
+ int j;
+ for(j=0;j<opve->subq_names.size();j++)
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
+ fprintf(qtree_output,"\t</UDOP>\n");
+ }
+
+
+//-----------------------------------------------------------------
+
+// Create interface-specific meta code files.
+// first, open and parse the interface resources file.
+ ifaces_db = new ifq_t();
+ ierr = "";
+ if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
+ fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
+ ifx_fname.c_str(), ierr.c_str());
+ exit(1);
+ }
+
+ map<string, vector<stream_query *> >::iterator svsi;
+ for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
+ string lmach = (*svsi).first;
+
+ // For this machine, create a set of lftas per interface.
+ vector<stream_query *> mach_lftas = (*svsi).second;
+ map<string, vector<stream_query *> > lfta_iface_lists;
+ int li;
+ for(li=0;li<mach_lftas.size();++li){
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ string lfta_iface = tvec[0]->get_interface();
+ }
+ lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
+ }
+
+ map<string, vector<stream_query *> >::iterator lsvsi;
+ for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
+ int erri;
+ string liface = (*lsvsi).first;
+ vector<stream_query *> iface_lftas = (*lsvsi).second;
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
+ if(iface_codegen_type.size()){
+ nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
+ if(!nicprop){
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
+ exit(1);
+ }
+ string mcs = generate_nic_code(iface_lftas, nicprop);
+ string mcf_flnm;
+ if(lmach != "")
+ mcf_flnm = lmach + "_"+liface+".mcf";
+ else
+ mcf_flnm = hostname + "_"+liface+".mcf";
+ FILE *mcf_fl ;
+ if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
+ exit(1);
+ }
+ fprintf(mcf_fl,"%s",mcs.c_str());
+ fclose(mcf_fl);
+//printf("mcs of machine %s, iface %s of type %s is \n%s\n",
+//lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
+ }
+ }
+
+
+ }
+
+
+
+//-----------------------------------------------------------------
+
+
+// Find common filter predicates in the LFTAs.
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
+
+ map<string, vector<stream_query *> >::iterator ssqi;
+ for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
+
+ string lmach = (*ssqi).first;
+ bool packed_return = false;
+ int li, erri;
+
+
+// The LFTAs of this machine.
+ vector<stream_query *> mach_lftas = (*ssqi).second;
+// break up on a per-interface basis.
+ map<string, vector<stream_query *> > lfta_iface_lists;
+ map<string, vector<int> > lfta_iface_qname_ix; // need the query name
+ // for fta_init
+ for(li=0;li<mach_lftas.size();++li){
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ lfta_iface = tvec[0]->get_interface();
+ }
+ lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
+ lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
+ }
+
+
+// Are the return values "packed"?
+// This should be done on a per-interface basis.
+// But this is defunct code for gs-lite
+ for(li=0;li<mach_lftas.size();++li){
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
+ string liface = "_local_";
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface();
+ }
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
+ if(iface_codegen_type.size()){
+ if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
+ packed_return = true;
+ }
+ }
+ }
+
+
+// Separate lftas by interface, collect results on a per-interface basis.
+
+ vector<cnf_set *> no_preds; // fallback if there is no prefilter
+ map<string, vector<cnf_set *> > prefilter_preds;
+ set<unsigned int> pred_ids; // this can be global for all interfaces
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ vector<cnf_set *> empty_list;
+ prefilter_preds[liface] = empty_list;
+ if(! packed_return){
+ get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
+ }
+
+// get NIC capabilities. (Is this needed?)
+ nic_property *nicprop = NULL;
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
+ if(iface_codegen_type.size()){
+ nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
+ if(!nicprop){
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
+ exit(1);
+ }
+ }
+ }
+
+
+// Now that we know the prefilter preds, generate the lfta code.
+// Do this for all lftas in this machine.
+ for(li=0;li<mach_lftas.size();++li){
+ set<unsigned int> subsumed_preds;
+ set<unsigned int>::iterator sii;
+#ifdef PREFILTER_OK
+ for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
+ int pid = (*sii);
+ if((pid>>16) == li){
+ subsumed_preds.insert(pid & 0xffff);
+ }
+ }
+#endif
+ string lfta_schema_str = mach_lftas[li]->make_schema();
+ string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
+ nic_property *nicprop = NULL; // no NIC properties?
+ lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
+ }
+
+
+// generate structs to store the temporal attributes
+// unpacked by prefilter
+ col_id_set temp_cids;
+ get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
+ lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
+
+// Compute the lfta bit signatures and the lfta colrefs
+// do this on a per-interface basis
+#ifdef PREFILTER_OK
+ lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
+#endif
+ map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ vector<long long int> empty_list;
+ lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
+
+ vector<col_id_set> lfta_cols;
+ vector<int> lfta_snap_length;
+ for(li=0;li<lfta_iface_lists[liface].size();++li){
+ unsigned long long int mask=0, bpos=1;
+ int f_pos;
+ for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
+ if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
+ mask |= bpos;
+ bpos = bpos << 1;
+ }
+ lfta_sigs[liface].push_back(mask);
+ lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
+ }
+
+//for(li=0;li<mach_lftas.size();++li){
+//printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
+//col_id_set::iterator tcisi;
+//for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
+//printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
+//}
+//}
+
+
+// generate the prefilter
+// Do this on a per-interface basis, except for the #define
+#ifdef PREFILTER_OK
+// lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
+ lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
+#else
+ lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
+
+#endif
+ }
+
+// Generate interface parameter lookup function
+ lfta_val[lmach] += "// lookup interface properties by name\n";
+ lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
+ lfta_val[lmach] += "// returns NULL if given property does not exist\n";
+ lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
+
+// collect a lit of interface names used by queries running on this host
+ set<std::string> iface_names;
+ for(i=0;i<mach_query_names[lmach].size();i++){
+ int mi = mach_query_names[lmach][i];
+ stream_query *lfta_sq = lfta_mach_lists[lmach][i];
+
+ if(interface_names[mi]=="")
+ iface_names.insert("DEFAULTDEV");
+ else
+ iface_names.insert(interface_names[mi]);
+ }
+
+// generate interface property lookup code for every interface
+ set<std::string>::iterator sir;
+ for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
+ if (sir == iface_names.begin())
+ lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
+ else
+ lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
+
+ // iterate through interface properties
+ vector<string> iface_properties;
+ if(*sir!="_local_"){ // dummy watchlist interface, don't process.
+ iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ }
+ if (erri) {
+ fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
+ exit(1);
+ }
+ if (iface_properties.empty())
+ lfta_val[lmach] += "\t\treturn NULL;\n";
+ else {
+ for (int i = 0; i < iface_properties.size(); ++i) {
+ if (i == 0)
+ lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
+ else
+ lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
+
+ // combine all values for the interface property using comma separator
+ vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
+ lfta_val[lmach] += "\t\t\treturn \"";
+ for (int j = 0; j < vals.size(); ++j) {
+ lfta_val[lmach] += vals[j];
+ if (j != vals.size()-1)
+ lfta_val[lmach] += ",";
+ }
+ lfta_val[lmach] += "\";\n";
+ }
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
+ lfta_val[lmach] += "\t\t} else\n";
+ lfta_val[lmach] += "\t\t\treturn NULL;\n";
+ }
+ }
+ lfta_val[lmach] += "\t} else\n";
+ lfta_val[lmach] += "\t\treturn NULL;\n";
+ lfta_val[lmach] += "}\n\n";
+
+
+// Generate a full list of FTAs for clearinghouse reference
+ lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
+ lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
+
+ bool first = true;
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ if(liface != "_local_"){ // these don't register themselves
+ vector<stream_query *> lfta_list = (*mvsi).second;
+ for(i=0;i<lfta_list.size();i++){
+ int mi = lfta_iface_qname_ix[liface][i];
+ if(first) first = false;
+ else lfta_val[lmach] += ", ";
+ lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
+ }
+ }
+ }
+// for (i = 0; i < registration_query_names.size(); ++i) {
+// if (i)
+// lfta_val[lmach] += ", ";
+// lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
+// }
+
+ for (i = 0; i < hfta_list.size(); ++i) {
+ lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
+ }
+ lfta_val[lmach] += ", NULL};\n\n";
+
+
+// Add the initialization function to lfta.c
+// Change to accept the interface name, and
+// set the prefilter function accordingly.
+// see the example in demo/err2
+ lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
+ lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
+
+// for(i=0;i<mach_query_names[lmach].size();i++)
+// int mi = mach_query_names[lmach][i];
+// stream_query *lfta_sq = lfta_mach_lists[lmach][i];
+
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ vector<stream_query *> lfta_list = (*mvsi).second;
+ for(i=0;i<lfta_list.size();i++){
+ stream_query *lfta_sq = lfta_list[i];
+ int mi = lfta_iface_qname_ix[liface][i];
+
+ if(liface == "_local_"){
+// Don't register an init function, do the init code inline
+ lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
+ lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
+ continue;
+ }
+
+ fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
+
+ string this_iface = "DEFAULTDEV";
+ if(interface_names[mi]!="")
+ this_iface = '"'+interface_names[mi]+'"';
+ lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
+ lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+// if(interface_names[mi]=="")
+// lfta_val[lmach]+="DEFAULTDEV";
+// else
+// lfta_val[lmach]+='"'+interface_names[mi]+'"';
+ lfta_val[lmach] += this_iface;
+
+
+ lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
+ +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
+ +"\n#endif\n";
+ sprintf(tmpstr,",%d",snap_lengths[mi]);
+ lfta_val[lmach] += tmpstr;
+
+// unsigned long long int mask=0, bpos=1;
+// int f_pos;
+// for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
+// if(prefilter_preds[f_pos]->lfta_id.count(i))
+// mask |= bpos;
+// bpos = bpos << 1;
+// }
+
+#ifdef PREFILTER_OK
+// sprintf(tmpstr,",%lluull",mask);
+ sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
+ lfta_val[lmach]+=tmpstr;
+#else
+ lfta_val[lmach] += ",0ull";
+#endif
+
+ lfta_val[lmach] += ");\n";
+
+
+
+// End of lfta prefilter stuff
+// --------------------------------------------------
+
+// If there is a field verifier, warn about
+// lack of compatability
+ string lfta_comment;
+ string lfta_title;
+ string lfta_namespace;
+ map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
+ if(ldefs.count("comment")>0)
+ lfta_comment = lfta_sq->defines["comment"];
+ if(ldefs.count("Comment")>0)
+ lfta_comment = lfta_sq->defines["Comment"];
+ if(ldefs.count("COMMENT")>0)
+ lfta_comment = lfta_sq->defines["COMMENT"];
+ if(ldefs.count("title")>0)
+ lfta_title = lfta_sq->defines["title"];
+ if(ldefs.count("Title")>0)
+ lfta_title = lfta_sq->defines["Title"];
+ if(ldefs.count("TITLE")>0)
+ lfta_title = lfta_sq->defines["TITLE"];
+ if(ldefs.count("NAMESPACE")>0)
+ lfta_namespace = lfta_sq->defines["NAMESPACE"];
+ if(ldefs.count("Namespace")>0)
+ lfta_namespace = lfta_sq->defines["Namespace"];
+ if(ldefs.count("namespace")>0)
+ lfta_namespace = lfta_sq->defines["namespace"];
+
+ string lfta_ht_size;
+ if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
+ lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
+ if(ldefs.count("aggregate_slots")>0){
+ lfta_ht_size = ldefs["aggregate_slots"];
+ }
+
+// NOTE : I'm assuming that visible lftas do not start with _fta.
+// -- will fail for non-visible simple selection queries.
+ if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
+ string warning_str;
+ if(lfta_comment == "")
+ warning_str += "\tcomment not found.\n";
+// Obsolete stuff that carsten wanted
+// if(lfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(lfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
+
+ vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
+ int fi;
+ for(fi=0;fi<flds.size();fi++){
+ field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
+ }
+ if(warning_str != "")
+ fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
+ registration_query_names[mi].c_str(),warning_str.c_str());
+ }
+
+
+// Create qtree output
+ fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
+ if(lfta_comment != "")
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
+ if(lfta_title != "")
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
+ if(lfta_namespace != "")
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
+ if(lfta_ht_size != "")
+ fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
+ if(lmach != "")
+ fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
+ else
+ fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
+ fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
+ std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
+ for(int t=0;t<itbls.size();++t){
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
+ }
+// fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
+// write info about fields to qtree.xml
+ vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
+ int fi;
+ for(fi=0;fi<flds.size();fi++){
+ fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
+ if(flds[fi]->get_modifier_list()->size()){
+ fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
+ }
+ fprintf(qtree_output," />\n");
+ }
+ fprintf(qtree_output,"\t</LFTA>\n");
+
+
+ }
+ }
+
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ lfta_val[lmach] +=
+" if (!strcmp(device, \""+liface+"\")) \n"
+" lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
+;
+ }
+ lfta_val[lmach] +=
+" if(lfta_prefilter == NULL){\n"
+" fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
+" exit(1);\n"
+" }\n"
+;
+
+
+
+ lfta_val[lmach] += "}\n\n";
+
+ if(!(debug_only || hfta_only) ){
+ string lfta_flnm;
+ if(lmach != "")
+ lfta_flnm = lmach + "_lfta.c";
+ else
+ lfta_flnm = hostname + "_lfta.c";
+ if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
+ fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
+ exit(1);
+ }
+ fprintf(lfta_out,"%s",lfta_header.c_str());
+ fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
+ fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
+ fclose(lfta_out);
+ }
+ }
+
+// Say what are the operators which must execute
+ if(opviews.size()>0)
+ fprintf(stderr,"The queries use the following external operators:\n");
+ for(i=0;i<opviews.size();++i){
+ opview_entry *opv = opviews.get_entry(i);
+ fprintf(stderr,"\t%s\n",opv->view_name.c_str());
+ }
+
+ if(create_makefile)
+ generate_makefile(input_file_names, nfiles, hfta_names, opviews,
+ machine_names, schema_file_name,
+ interface_names,
+ ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
+
+
+ fprintf(qtree_output,"</QueryNodes>\n");
+
+ return(0);
+}
+
+////////////////////////////////////////////////////////////
+
+void generate_makefile(vector<string> &input_file_names, int nfiles,
+ vector<string> &hfta_names, opview_set &opviews,
+ vector<string> &machine_names,
+ string schema_file_name,
+ vector<string> &interface_names,
+ ifq_t *ifdb, string &config_dir_path,
+ bool use_pads,
+ string extra_libs,
+ map<string, vector<int> > &rts_hload
+ ){
+ int i,j;
+
+ if(config_dir_path != ""){
+ config_dir_path = "-C "+config_dir_path;
+ }
+
+ struct stat sb;
+ bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
+ bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
+
+// if(libz_exists && !libast_exists){
+// fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
+// exit(1);
+// }
+
+// Get set of operator executable files to run
+ set<string> op_fls;
+ set<string>::iterator ssi;
+ for(i=0;i<opviews.size();++i){
+ opview_entry *opv = opviews.get_entry(i);
+ if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
+ }
+
+ FILE *outfl = fopen("Makefile", "w");
+ if(outfl==NULL){
+ fprintf(stderr,"Can't open Makefile for write, exiting.\n");
+ exit(0);
+ }
+
+ fputs(
+("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
+"CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
+).c_str(), outfl
+);
+ if(generate_stats)
+ fprintf(outfl," -DLFTA_STATS");
+
+// Gather the set of interfaces
+// Also, gather "base interface names" for use in computing
+// the hash splitting to virtual interfaces.
+// TODO : must update to hanndle machines
+ set<string> ifaces;
+ set<string> base_vifaces; // base interfaces of virtual interfaces
+ map<string, string> ifmachines;
+ map<string, string> ifattrs;
+ for(i=0;i<interface_names.size();++i){
+ ifaces.insert(interface_names[i]);
+ ifmachines[interface_names[i]] = machine_names[i];
+
+ size_t Xpos = interface_names[i].find_last_of("X");
+ if(Xpos!=string::npos){
+ string iface = interface_names[i].substr(0,Xpos);
+ base_vifaces.insert(iface);
+ }
+ // get interface attributes and add them to the list
+ }
+
+// Do we need to include protobuf libraries?
+// TODO Move to the interface library: get the libraries to include
+// for an interface type
+
+ bool use_proto = false;
+ bool use_bsa = false;
+ bool use_kafka = false;
+ bool use_ssl = false;
+ int erri;
+ string err_str;
+ for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
+ string ifnm = (*ssi);
+ vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i]=="PROTO"){
+#ifdef PROTO_ENABLED
+ use_proto = true;
+#else
+ fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef BSA_ENABLED
+ use_bsa = true;
+#else
+ fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef KAFKA_ENABLED
+ use_kafka = true;
+#else
+ fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ }
+
+ fprintf(outfl,
+"\n"
+"\n"
+"all: rts");
+ for(i=0;i<hfta_names.size();++i)
+ fprintf(outfl," %s",hfta_names[i].c_str());
+ fputs(
+("\n"
+"\n"
+"rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
+"\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
+ if(use_pads)
+ fprintf(outfl,"-L. ");
+ fputs(
+("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
+ if(use_pads)
+ fprintf(outfl,"-lgscppads -lpads ");
+ fprintf(outfl,
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
+ if(use_pads)
+ fprintf(outfl, " -lpz -lz -lbz ");
+ if(libz_exists && libast_exists)
+ fprintf(outfl," -last ");
+ if(use_pads)
+ fprintf(outfl, " -ldll -ldl ");
+
+#ifdef PROTO_ENABLED
+ fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+#endif
+#ifdef BSA_ENABLED
+ fprintf(outfl, " -lbsa_stream ");
+#endif
+#ifdef KAFKA_ENABLED
+ fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
+#endif
+ fprintf(outfl," -lgscpaux");
+#ifdef GCOV
+ fprintf(outfl," -fprofile-arcs");
+#endif
+ fprintf(outfl,
+"\n"
+"\n"
+"lfta.o: %s_lfta.c\n"
+"\t$(CC) -o lfta.o -c %s_lfta.c\n"
+"\n"
+"%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
+ for(i=0;i<nfiles;++i)
+ fprintf(outfl," %s",input_file_names[i].c_str());
+ if(hostname == ""){
+ fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
+ }else{
+ fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
+ }
+ for(i=0;i<nfiles;++i)
+ fprintf(outfl," %s",input_file_names[i].c_str());
+ fprintf(outfl,"\n");
+
+ for(i=0;i<hfta_names.size();++i)
+ fprintf(outfl,
+("%s: %s.o\n"
+"\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
+"\n"
+"%s.o: %s.cc\n"
+"\t$(CPP) -o %s.o -c %s.cc\n"
+"\n"
+"\n").c_str(),
+ hfta_names[i].c_str(), hfta_names[i].c_str(),
+ hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
+ hfta_names[i].c_str(), hfta_names[i].c_str(),
+ hfta_names[i].c_str(), hfta_names[i].c_str()
+ );
+
+ fprintf(outfl,
+("\n"
+"packet_schema.txt:\n"
+"\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
+"\n"
+"external_fcns.def:\n"
+"\tln -s "+root_path+"/cfg/external_fcns.def .\n"
+"\n"
+"clean:\n"
+"\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
+ for(i=0;i<hfta_names.size();++i)
+ fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
+ fprintf(outfl,"\n");
+
+ fclose(outfl);
+
+
+
+// Gather the set of interfaces
+// TODO : must update to hanndle machines
+// TODO : lookup interface attributes and add them as a parameter to rts process
+ outfl = fopen("runit", "w");
+ if(outfl==NULL){
+ fprintf(stderr,"Can't open runit for write, exiting.\n");
+ exit(0);
+ }
+
+
+ fputs(
+("#!/bin/sh\n"
+"./stopit\n"
++root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
+"sleep 5\n"
+"if [ ! -f gshub.log ]\n"
+"then\n"
+"\techo \"Failed to start bin/gshub.py\"\n"
+"\texit -1\n"
+"fi\n"
+"ADDR=`cat gshub.log`\n"
+"ps opgid= $! >> gs.pids\n"
+"./rts $ADDR default ").c_str(), outfl);
+// int erri;
+// string err_str;
+ for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
+ string ifnm = (*ssi);
+ // suppress internal _local_ interface
+ if (ifnm == "_local_")
+ continue;
+ fprintf(outfl, "%s ",ifnm.c_str());
+ vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
+ for(j=0;j<ifv.size();++j)
+ fprintf(outfl, "%s ",ifv[j].c_str());
+ }
+ fprintf(outfl, " &\n");
+ fprintf(outfl, "echo $! >> gs.pids\n");
+ for(i=0;i<hfta_names.size();++i)
+ fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
+
+ for(j=0;j<opviews.opview_list.size();++j){
+ fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
+ }
+
+ fclose(outfl);
+ system("chmod +x runit");
+
+ outfl = fopen("stopit", "w");
+ if(outfl==NULL){
+ fprintf(stderr,"Can't open stopit for write, exiting.\n");
+ exit(0);
+ }
+
+ fprintf(outfl,"#!/bin/sh\n"
+"rm -f gshub.log\n"
+"if [ ! -f gs.pids ]\n"
+"then\n"
+"exit\n"
+"fi\n"
+"for pgid in `cat gs.pids`\n"
+"do\n"
+"kill -TERM -$pgid\n"
+"done\n"
+"sleep 1\n"
+"for pgid in `cat gs.pids`\n"
+"do\n"
+"kill -9 -$pgid\n"
+"done\n"
+"rm gs.pids\n");
+
+ fclose(outfl);
+ system("chmod +x stopit");
+
+//-----------------------------------------------
+
+/* For now disable support for virtual interfaces
+ outfl = fopen("set_vinterface_hash.bat", "w");
+ if(outfl==NULL){
+ fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
+ exit(0);
+ }
+
+// The format should be determined by an entry in the ifres.xml file,
+// but for now hardcode the only example I have.
+ for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
+ if(rts_hload.count((*ssi))){
+ string iface_name = (*ssi);
+ string iface_number = "";
+ for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
+ if(isdigit(iface_name[j])){
+ iface_number = iface_name[j];
+ if(j>0 && isdigit(iface_name[j-1]))
+ iface_number = iface_name[j-1] + iface_number;
+ }
+ }
+
+ fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
+ vector<int> halloc = rts_hload[iface_name];
+ int prev_limit = 0;
+ for(j=0;j<halloc.size();++j){
+ if(j>0)
+ fprintf(outfl,":");
+ fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
+ prev_limit = halloc[j];
+ }
+ fprintf(outfl,"\n");
+ }
+ }
+ fclose(outfl);
+ system("chmod +x set_vinterface_hash.bat");
+*/
+}
+
+// Code for implementing a local schema
+/*
+ table_list qpSchema;
+
+// Load the schemas of any LFTAs.
+ int l;
+ for(l=0;l<hfta_nbr;++l){
+ stream_query *sq0 = split_queries[l];
+ table_def *td = sq0->get_output_tabledef();
+ qpSchema.append_table(td);
+ }
+// load the schemas of any other ref'd tables.
+// (e.g., hftas)
+ vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
+ int ti;
+ for(ti=0;ti<input_tbl_names.size();++ti){
+ int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
+ if(tbl_ref < 0){
+ tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
+ if(tbl_ref < 0){
+ fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
+ exit(1);
+ }
+ qpSchema.append_table(Schema->get_table(tbl_ref));
+ }
+ }
+*/
+
+// Functions related to parsing.
+
+/*
+static int split_string(char *instr,char sep, char **words,int max_words){
+ char *loc;
+ char *str;
+ int nwords = 0;
+
+ str = instr;
+ words[nwords++] = str;
+ while( (loc = strchr(str,sep)) != NULL){
+ *loc = '\0';
+ str = loc+1;
+ if(nwords >= max_words){
+ fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
+ nwords = max_words-1;
+ }
+ words[nwords++] = str;
+ }
+
+ return(nwords);
+}
+
+*/
+