1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
37 #include <sys/types.h>
43 // to verify that some files exist.
44 #include <sys/types.h>
47 #include "parse_partn.h"
49 #include "print_plan.h"
51 // Interface to the xml parser
54 #include"field_list.h"
56 extern int xmlParserparse(void);
57 extern FILE *xmlParserin;
58 extern int xmlParserdebug;
60 std::vector<std::string> xml_attr_vec;
61 std::vector<std::string> xml_val_vec;
62 std::string xml_a, xml_v;
63 xml_t *xml_leaves = NULL;
65 // Interface to the field list verifier
66 field_list *field_verifier = NULL;
68 #define TMPSTRLEN 1000
71 #define PATH_DELIM '/'
74 char tmp_schema_str[10000];
76 // maximum delay between two hearbeats produced
77 // by UDOP. Used when its not explicity
78 // provided in udop definition
79 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
81 // Default lfta hash table size, must be power of 2.
82 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
84 // Interface to FTA definition lexer and parser ...
86 extern int FtaParserparse(void);
87 extern FILE *FtaParserin;
88 extern int FtaParserdebug;
90 fta_parse_t *fta_parse_result;
91 var_defs_t *fta_parse_defines;
95 // Interface to external function lexer and parser ...
97 extern int Ext_fcnsParserparse(void);
98 extern FILE *Ext_fcnsParserin;
99 extern int Ext_fcnsParserdebug;
101 ext_fcn_list *Ext_fcns;
104 // Interface to partition definition parser
105 extern int PartnParserparse();
106 partn_def_list_t *partn_parse_result = NULL;
114 // forward delcaration of local utility function
115 void generate_makefile(vector<string> &input_file_names, int nfiles,
116 vector<string> &hfta_names, opview_set &opviews,
117 vector<string> &machine_names,
118 string schema_file_name,
119 vector<string> &interface_names,
120 ifq_t *ifdb, string &config_dir_path,
123 map<string, vector<int> > &rts_hload
126 //static int split_string(char *instr,char sep, char **words,int max_words);
129 FILE *schema_summary_output = NULL; // query names
131 // Dump schema summary
132 void dump_summary(stream_query *str){
133 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
135 table_def *sch = str->get_output_tabledef();
137 vector<field_entry *> flds = sch->get_fields();
139 for(f=0;f<flds.size();++f){
140 if(f>0) fprintf(schema_summary_output,"|");
141 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
143 fprintf(schema_summary_output,"\n");
144 for(f=0;f<flds.size();++f){
145 if(f>0) fprintf(schema_summary_output,"|");
146 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
148 fprintf(schema_summary_output,"\n");
152 string hostname; // name of current host.
154 bool generate_stats = false;
155 string root_path = "../..";
158 int main(int argc, char **argv){
159 char tmpstr[TMPSTRLEN];
163 set<int>::iterator si;
165 vector<string> query_names; // for lfta.c registration
166 map<string, vector<int> > mach_query_names; // list queries of machine
167 vector<int> snap_lengths; // for lfta.c registration
168 vector<string> interface_names; // for lfta.c registration
169 vector<string> machine_names; // machine of interface
170 vector<bool> lfta_reuse_options; // for lfta.c registration
171 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
172 vector<string> hfta_names; // hfta cource code names, for
173 // creating make file.
174 vector<string> qnames; // ensure unique names
175 map<string, int> lfta_names; // keep track of unique lftas.
178 // set these to 1 to debug the parser
180 Ext_fcnsParserdebug = 0;
182 FILE *lfta_out; // lfta.c output.
183 FILE *fta_in; // input file
184 FILE *table_schemas_in; // source tables definition file
185 FILE *query_name_output; // query names
186 FILE *qtree_output; // interconnections of query nodes
188 // -------------------------------
189 // Handling of Input Arguments
190 // -------------------------------
191 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
192 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
193 "\t[-B] : debug only (don't create output files)\n"
194 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
195 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
196 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
197 "\t[-C] : use <config directory> for definition files\n"
198 "\t[-l] : use <library directory> for library queries\n"
199 "\t[-N] : output query names in query_names.txt\n"
200 "\t[-H] : create HFTA only (no schema_file)\n"
201 "\t[-Q] : use query name for hfta suffix\n"
202 "\t[-M] : generate make file and runit, stopit scripts\n"
203 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
204 "\t[-f] : Output schema summary to schema_summary.txt\n"
205 "\t[-P] : link with PADS\n"
206 "\t[-h] : override host name.\n"
207 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
208 "\t[-R] : path to root of GS-lite\n"
211 // parameters gathered from command line processing
212 string external_fcns_path;
213 // string internal_fcn_path;
214 string config_dir_path;
215 string library_path = "./";
216 vector<string> input_file_names;
217 string schema_file_name;
218 bool debug_only = false;
219 bool hfta_only = false;
220 bool output_query_names = false;
221 bool output_schema_summary=false;
222 bool numeric_hfta_flname = true;
223 bool create_makefile = false;
224 bool distributed_mode = false;
225 bool partitioned_mode = false;
226 bool use_live_hosts_file = false;
227 bool use_pads = false;
228 bool clean_make = false;
229 int n_virtual_interfaces = 1;
232 while((chopt = getopt(argc,argv,optstr)) != -1){
238 distributed_mode = true;
241 partitioned_mode = true;
244 use_live_hosts_file = true;
248 config_dir_path = string(optarg) + string("/");
252 library_path = string(optarg) + string("/");
255 output_query_names = true;
258 numeric_hfta_flname = false;
261 if(schema_file_name == ""){
266 output_schema_summary=true;
269 create_makefile=true;
290 n_virtual_interfaces = atoi(optarg);
291 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
292 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
293 n_virtual_interfaces = 1;
298 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
299 fprintf(stderr,"%s\n", usage_str);
302 fprintf(stderr, "Argument was %c\n", optopt);
303 fprintf(stderr,"Invalid arguments\n");
304 fprintf(stderr,"%s\n", usage_str);
310 for (int i = 0; i < argc; ++i) {
311 if((schema_file_name == "") && !hfta_only){
312 schema_file_name = argv[i];
314 input_file_names.push_back(argv[i]);
318 if(input_file_names.size() == 0){
319 fprintf(stderr,"%s\n", usage_str);
324 string clean_cmd = "rm Makefile hfta_*.cc";
325 int clean_ret = system(clean_cmd.c_str());
327 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
332 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
334 // Open globally used file names.
336 // prepend config directory to schema file
337 schema_file_name = config_dir_path + schema_file_name;
338 external_fcns_path = config_dir_path + string("external_fcns.def");
339 string ifx_fname = config_dir_path + string("ifres.xml");
341 // Find interface query file(s).
343 gethostname(tmpstr,TMPSTRLEN);
346 hostname_len = strlen(tmpstr);
347 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
348 vector<string> ifq_fls;
350 ifq_fls.push_back(ifq_fname);
353 // Get the field list, if it exists
354 string flist_fl = config_dir_path + "field_list.xml";
356 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
357 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
358 xml_leaves = new xml_t();
359 xmlParser_setfileinput(flf_in);
360 if(xmlParserparse()){
361 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
363 field_verifier = new field_list(xml_leaves);
368 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
369 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
375 if(!(debug_only || hfta_only)){
376 if((lfta_out = fopen("lfta.c","w")) == NULL){
377 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
383 // Get the output specification file.
385 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
386 string ospec_fl = "output_spec.cfg";
388 vector<ospec_str *> output_specs;
389 multimap<string, int> qname_to_ospec;
390 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
395 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
397 // make operator type lowercase
399 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
400 *tmpc = tolower(*tmpc);
402 ospec_str *tmp_ospec = new ospec_str();
403 tmp_ospec->query = flds[0];
404 tmp_ospec->operator_type = flds[1];
405 tmp_ospec->operator_param = flds[2];
406 tmp_ospec->output_directory = flds[3];
407 tmp_ospec->bucketwidth = atoi(flds[4]);
408 tmp_ospec->partitioning_flds = flds[5];
409 tmp_ospec->n_partitions = atoi(flds[6]);
410 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
411 output_specs.push_back(tmp_ospec);
413 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
418 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
423 string pspec_fl = "hfta_parallelism.cfg";
425 map<string, int> hfta_parallelism;
426 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
430 bool good_entry = true;
432 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
434 string hname = flds[0];
435 int par = atoi(flds[1]);
436 if(par <= 0 || par > n_virtual_interfaces){
437 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440 if(good_entry && n_virtual_interfaces % par != 0){
441 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
445 hfta_parallelism[hname] = par;
449 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
453 // LFTA hash table sizes
454 string htspec_fl = "lfta_htsize.cfg";
455 FILE *htsp_in = NULL;
456 map<string, int> lfta_htsize;
457 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
461 bool good_entry = true;
463 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
465 string lfta_name = flds[0];
466 int htsz = atoi(flds[1]);
468 lfta_htsize[lfta_name] = htsz;
470 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
475 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478 // LFTA vitual interface hash split
479 string rtlspec_fl = "rts_load.cfg";
481 map<string, vector<int> > rts_hload;
482 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
487 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
488 bool good_entry = true;
492 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
494 iface_name = flds[0];
497 for(j=1;j<nflds;++j){
498 int h = atoi(flds[j]);
502 hload.push_back(cumm_h);
508 rts_hload[iface_name] = hload;
510 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
517 if(output_query_names){
518 if((query_name_output = fopen("query_names.txt","w")) == NULL){
519 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
524 if(output_schema_summary){
525 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
526 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
531 if((qtree_output = fopen("qtree.xml","w")) == NULL){
532 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
536 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
537 fprintf(qtree_output,"<QueryNodes>\n");
540 // Get an initial Schema
543 // Parse the table schema definitions.
544 fta_parse_result = new fta_parse_t();
545 FtaParser_setfileinput(table_schemas_in);
546 if(FtaParserparse()){
547 fprintf(stderr,"Table schema parse failed.\n");
550 if(fta_parse_result->parse_type != TABLE_PARSE){
551 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554 Schema = fta_parse_result->tables;
556 // Process schema field inheritance
558 retval = Schema->unroll_tables(err_str);
560 fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
564 // hfta only => we will try to fetch schemas from the registry.
565 // therefore, start off with an empty schema.
566 Schema = new table_list();
570 // Open and parse the external functions file.
571 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
572 if(Ext_fcnsParserin == NULL){
573 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
574 Ext_fcns = new ext_fcn_list();
576 if(Ext_fcnsParserparse()){
577 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
578 Ext_fcns = new ext_fcn_list();
581 if(Ext_fcns->validate_fcns(err_str)){
582 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
586 // Open and parse the interface resources file.
587 // ifq_t *ifaces_db = new ifq_t();
589 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
590 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
591 // ifx_fname.c_str(), ierr.c_str());
594 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
595 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
596 // ifq_fname.c_str(), ierr.c_str());
601 // The LFTA code string.
602 // Put the standard preamble here.
603 // NOTE: the hash macros, fcns should go into the run time
604 map<string, string> lfta_val;
605 map<string, string> lfta_prefilter_val;
608 "#include <limits.h>\n\n"
609 "#include \"rts.h\"\n"
610 "#include \"fta.h\"\n"
611 "#include \"lapp.h\"\n"
612 "#include \"rts_udaf.h\"\n\n"
614 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
615 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
616 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
617 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
620 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
622 "#define SLOT_FILLED 0x04\n"
623 "#define SLOT_GEN_BITS 0x03\n"
624 "#define SLOT_HASH_BITS 0xfffffff8\n"
625 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
626 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
627 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
630 "#define lfta_BOOL_to_hash(x) (x)\n"
631 "#define lfta_USHORT_to_hash(x) (x)\n"
632 "#define lfta_UINT_to_hash(x) (x)\n"
633 "#define lfta_IP_to_hash(x) (x)\n"
634 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
635 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
636 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
637 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
638 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
639 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
640 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
641 " for(i=0;i<x.length;++i){\n"
642 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
648 " if((i%4)!=0) ret ^=tmp_sum;\n"
654 //////////////////////////////////////////////////////////////////
655 ///// Get all of the query parse trees
659 int hfta_count = 0; // for numeric suffixes to hfta .cc files
661 //---------------------------
662 // Global info needed for post processing.
664 // Set of operator views ref'd in the query set.
666 // lftas on a per-machine basis.
667 map<string, vector<stream_query *> > lfta_mach_lists;
668 int nfiles = input_file_names.size();
669 vector<stream_query *> hfta_list; // list of hftas.
670 map<string, stream_query *> sq_map; // map from query name to stream query.
673 //////////////////////////////////////////
675 // Open and parse the interface resources file.
676 ifq_t *ifaces_db = new ifq_t();
678 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
679 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
680 ifx_fname.c_str(), ierr.c_str());
683 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
684 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
685 ifq_fls[0].c_str(), ierr.c_str());
689 map<string, string> qname_to_flname; // for detecting duplicate query names
693 // Parse the files to create a vector of parse trees.
694 // Load qnodes with information to perform a topo sort
695 // based on query dependencies.
696 vector<query_node *> qnodes; // for topo sort.
697 map<string,int> name_node_map; // map query name to qnodes entry
698 for(i=0;i<input_file_names.size();i++){
700 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
701 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
704 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
706 // Parse the FTA query
707 fta_parse_result = new fta_parse_t();
708 FtaParser_setfileinput(fta_in);
709 if(FtaParserparse()){
710 fprintf(stderr,"FTA parse failed.\n");
713 if(fta_parse_result->parse_type != QUERY_PARSE){
714 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
718 // returns a list of parse trees
719 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
720 for(p=0;p<qlist.size();++p){
721 table_exp_t *fta_parse_tree = qlist[p];
722 // query_parse_trees.push_back(fta_parse_tree);
724 // compute the default name -- extract from query name
725 strcpy(tmpstr,input_file_names[i].c_str());
726 char *qname = strrchr(tmpstr,PATH_DELIM);
731 char *qname_end = strchr(qname,'.');
732 if(qname_end != NULL) *qname_end = '\0';
733 string qname_str = qname;
734 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
736 // Deternmine visibility. Should I be attaching all of the output methods?
737 if(qname_to_ospec.count(imputed_qname)>0)
738 fta_parse_tree->set_visible(true);
740 fta_parse_tree->set_visible(false);
743 // Create a manipulable repesentation of the parse tree.
744 // the qnode inherits the visibility assigned to the parse tree.
745 int pos = qnodes.size();
746 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
747 name_node_map[ qnodes[pos]->name ] = pos;
748 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
749 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
750 // qfiles.push_back(i);
752 // Check for duplicate query names
753 // NOTE : in hfta-only generation, I should
754 // also check with the names of the registered queries.
755 if(qname_to_flname.count(qnodes[pos]->name) > 0){
756 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
757 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
760 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
761 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
762 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
765 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
771 // Add the library queries
774 for(pos=0;pos<qnodes.size();++pos){
776 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
777 string src_tbl = qnodes[pos]->refd_tbls[fi];
778 if(qname_to_flname.count(src_tbl) == 0){
779 int last_sep = src_tbl.find_last_of('/');
780 if(last_sep != string::npos){
781 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
782 string target_qname = src_tbl.substr(last_sep+1);
783 string qpathname = library_path + src_tbl + ".gsql";
784 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
785 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
787 fprintf(stderr,"After exit\n");
789 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
790 // Parse the FTA query
791 fta_parse_result = new fta_parse_t();
792 FtaParser_setfileinput(fta_in);
793 if(FtaParserparse()){
794 fprintf(stderr,"FTA parse failed.\n");
797 if(fta_parse_result->parse_type != QUERY_PARSE){
798 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
802 map<string, int> local_query_map;
803 vector<string> local_query_names;
804 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
805 for(p=0;p<qlist.size();++p){
806 table_exp_t *fta_parse_tree = qlist[p];
807 fta_parse_tree->set_visible(false); // assumed to not produce output
808 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
809 if(imputed_qname == target_qname)
810 imputed_qname = src_tbl;
811 if(local_query_map.count(imputed_qname)>0){
812 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
815 local_query_map[ imputed_qname ] = p;
816 local_query_names.push_back(imputed_qname);
819 if(local_query_map.count(src_tbl)==0){
820 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
824 vector<int> worklist;
825 set<int> added_queries;
826 vector<query_node *> new_qnodes;
827 worklist.push_back(local_query_map[target_qname]);
828 added_queries.insert(local_query_map[target_qname]);
830 int qpos = qnodes.size();
831 for(qq=0;qq<worklist.size();++qq){
832 int q_id = worklist[qq];
833 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
834 new_qnodes.push_back( new_qnode);
835 vector<string> refd_tbls = new_qnode->refd_tbls;
837 for(ff = 0;ff<refd_tbls.size();++ff){
838 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
840 if(name_node_map.count(refd_tbls[ff])>0){
841 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
844 worklist.push_back(local_query_map[refd_tbls[ff]]);
850 for(qq=0;qq<new_qnodes.size();++qq){
851 int qpos = qnodes.size();
852 qnodes.push_back(new_qnodes[qq]);
853 name_node_map[qnodes[qpos]->name ] = qpos;
854 qname_to_flname[qnodes[qpos]->name ] = qpathname;
868 //---------------------------------------
873 string udop_missing_sources;
874 for(i=0;i<qnodes.size();++i){
876 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
877 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
879 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
880 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
881 int pos = qnodes.size();
882 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
883 name_node_map[ qnodes[pos]->name ] = pos;
884 qnodes[pos]->is_externally_visible = false; // its visible
885 // Need to mark the source queries as visible.
887 string missing_sources = "";
888 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
889 string src_tbl = qnodes[pos]->refd_tbls[si];
890 if(name_node_map.count(src_tbl)==0){
891 missing_sources += src_tbl + " ";
894 if(missing_sources != ""){
895 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
902 if(udop_missing_sources != ""){
903 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
909 ////////////////////////////////////////////////////////////////////
910 /// Check parse trees to verify that some
911 /// global properties are met :
912 /// if q1 reads from q2, then
913 /// q2 is processed before q1
914 /// q1 can supply q2's parameters
915 /// Verify there is no cycle in the reads-from graph.
917 // Compute an order in which to process the
920 // Start by building the reads-from lists.
923 for(i=0;i<qnodes.size();++i){
925 vector<string> refd_tbls = qnodes[i]->refd_tbls;
926 for(fi = 0;fi<refd_tbls.size();++fi){
927 if(name_node_map.count(refd_tbls[fi])>0){
928 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
929 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
935 // If one query reads the result of another,
936 // check for parameter compatibility. Currently it must
937 // be an exact match. I will move to requiring
938 // containment after re-ordering, but will require
939 // some analysis for code generation which is not
941 //printf("There are %d query nodes.\n",qnodes.size());
944 for(i=0;i<qnodes.size();++i){
945 vector<var_pair_t *> target_params = qnodes[i]->params;
946 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
947 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
948 if(target_params.size() != source_params.size()){
949 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
953 for(p=0;p<target_params.size();++p){
954 if(! (target_params[p]->name == source_params[p]->name &&
955 target_params[p]->val == source_params[p]->val ) ){
956 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
965 // Start by counting inedges.
966 for(i=0;i<qnodes.size();++i){
967 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
968 qnodes[(*si)]->n_consumers++;
972 // The roots are the nodes with indegree zero.
974 for(i=0;i<qnodes.size();++i){
975 if(qnodes[i]->n_consumers == 0){
976 if(qnodes[i]->is_externally_visible == false){
977 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
983 // Remove the parts of the subtree that produce no output.
984 set<int> valid_roots;
985 set<int> discarded_nodes;
987 while(roots.size() >0){
988 for(si=roots.begin();si!=roots.end();++si){
989 if(qnodes[(*si)]->is_externally_visible){
990 valid_roots.insert((*si));
992 discarded_nodes.insert((*si));
993 set<int>::iterator sir;
994 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
995 qnodes[(*sir)]->n_consumers--;
996 if(qnodes[(*sir)]->n_consumers == 0)
997 candidates.insert( (*sir));
1004 roots = valid_roots;
1005 if(discarded_nodes.size()>0){
1006 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1008 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1009 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1011 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1013 fprintf(stderr,"\n");
1016 // Compute the sources_to set, ignoring discarded nodes.
1017 for(i=0;i<qnodes.size();++i){
1018 if(discarded_nodes.count(i)==0)
1019 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1020 qnodes[(*si)]->sources_to.insert(i);
1025 // Find the nodes that are shared by multiple visible subtrees.
1026 // THe roots become inferred visible nodes.
1028 // Find the visible nodes.
1029 vector<int> visible_nodes;
1030 for(i=0;i<qnodes.size();i++){
1031 if(qnodes[i]->is_externally_visible){
1032 visible_nodes.push_back(i);
1036 // Find UDOPs referenced by visible nodes.
1038 for(i=0;i<visible_nodes.size();++i){
1039 workq.push_back(visible_nodes[i]);
1041 while(!workq.empty()){
1042 int node = workq.front();
1044 set<int>::iterator children;
1045 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1046 qnodes[node]->is_externally_visible = true;
1047 visible_nodes.push_back(node);
1048 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1049 if(qnodes[(*children)]->is_externally_visible == false){
1050 qnodes[(*children)]->is_externally_visible = true;
1051 visible_nodes.push_back((*children));
1055 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1056 workq.push_back((*children));
1063 for(i=0;i<qnodes.size();i++){
1064 qnodes[i]->subtree_roots.clear();
1067 // Walk the tree defined by a visible node, not descending into
1068 // subtrees rooted by a visible node. Mark the node visited with
1069 // the visible node ID.
1070 for(i=0;i<visible_nodes.size();++i){
1072 vroots.insert(visible_nodes[i]);
1073 while(vroots.size()>0){
1074 for(si=vroots.begin();si!=vroots.end();++si){
1075 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1077 set<int>::iterator sir;
1078 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1079 if(! qnodes[(*sir)]->is_externally_visible){
1080 candidates.insert( (*sir));
1084 vroots = candidates;
1088 // Find the nodes in multiple visible node subtrees, but with no parent
1089 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1090 done = true; // until proven otherwise
1091 for(i=0;i<qnodes.size();i++){
1092 if(qnodes[i]->subtree_roots.size()>1){
1093 bool is_new_root = true;
1094 set<int>::iterator sir;
1095 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1096 if(qnodes[(*sir)]->subtree_roots.size()>1)
1097 is_new_root = false;
1100 qnodes[i]->is_externally_visible = true;
1101 qnodes[i]->inferred_visible_node = true;
1102 visible_nodes.push_back(i);
1113 // get visible nodes in topo ordering.
1114 // for(i=0;i<qnodes.size();i++){
1115 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1117 vector<int> process_order;
1118 while(roots.size() >0){
1119 for(si=roots.begin();si!=roots.end();++si){
1120 if(discarded_nodes.count((*si))==0){
1121 process_order.push_back( (*si) );
1123 set<int>::iterator sir;
1124 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1125 qnodes[(*sir)]->n_consumers--;
1126 if(qnodes[(*sir)]->n_consumers == 0)
1127 candidates.insert( (*sir));
1135 //printf("process_order.size() =%d\n",process_order.size());
1137 // Search for cyclic dependencies
1139 for(i=0;i<qnodes.size();++i){
1140 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1141 if(found_dep.size() != 0) found_dep += ", ";
1142 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1145 if(found_dep.size()>0){
1146 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1150 // Get a list of query sets, in the order to be processed.
1151 // Start at visible root and do bfs.
1152 // The query set includes queries referenced indirectly,
1153 // as sources for user-defined operators. These are needed
1154 // to ensure that they are added to the schema, but are not part
1155 // of the query tree.
1157 // stream_node_sets contains queries reachable only through the
1158 // FROM clause, so I can tell which queries to add to the stream
1159 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1161 // NOTE: this code works because in order for data to be
1162 // read by multiple hftas, the node must be externally visible.
1163 // But visible nodes define roots of process sets.
1164 // internally visible nodes can feed data only
1165 // to other nodes in the same query file.
1166 // Therefore, any access can be restricted to a file,
1167 // hfta output sharing is done only on roots
1168 // never on interior nodes.
1173 // Conpute the base collection of hftas.
1174 vector<hfta_node *> hfta_sets;
1175 map<string, int> hfta_name_map;
1176 // vector< vector<int> > process_sets;
1177 // vector< set<int> > stream_node_sets;
1178 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1179 // i.e. process leaves 1st.
1180 for(i=0;i<process_order.size();++i){
1181 if(qnodes[process_order[i]]->is_externally_visible == true){
1182 //printf("Visible.\n");
1183 int root = process_order[i];
1184 hfta_node *hnode = new hfta_node();
1185 hnode->name = qnodes[root]-> name;
1186 hnode->source_name = qnodes[root]-> name;
1187 hnode->is_udop = qnodes[root]->is_udop;
1188 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1190 vector<int> proc_list; proc_list.push_back(root);
1191 // Ensure that nodes are added only once.
1192 set<int> proc_set; proc_set.insert(root);
1193 roots.clear(); roots.insert(root);
1195 while(roots.size()>0){
1196 for(si=roots.begin();si!=roots.end();++si){
1197 //printf("Processing root %d\n",(*si));
1198 set<int>::iterator sir;
1199 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1200 //printf("reads fom %d\n",(*sir));
1201 if(qnodes[(*sir)]->is_externally_visible==false){
1202 candidates.insert( (*sir) );
1203 if(proc_set.count( (*sir) )==0){
1204 proc_set.insert( (*sir) );
1205 proc_list.push_back( (*sir) );
1214 reverse(proc_list.begin(), proc_list.end());
1215 hnode->query_node_indices = proc_list;
1216 hfta_name_map[hnode->name] = hfta_sets.size();
1217 hfta_sets.push_back(hnode);
1221 // Compute the reads_from / sources_to graphs for the hftas.
1223 for(i=0;i<hfta_sets.size();++i){
1224 hfta_node *hnode = hfta_sets[i];
1225 for(q=0;q<hnode->query_node_indices.size();q++){
1226 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1227 for(s=0;s<qnode->refd_tbls.size();++s){
1228 if(hfta_name_map.count(qnode->refd_tbls[s])){
1229 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1230 hnode->reads_from.insert(other_hfta);
1231 hfta_sets[other_hfta]->sources_to.insert(i);
1237 // Compute a topological sort of the hfta_sets.
1239 vector<int> hfta_topsort;
1241 int hnode_srcs[hfta_sets.size()];
1242 for(i=0;i<hfta_sets.size();++i){
1244 if(hfta_sets[i]->sources_to.size() == 0)
1248 while(! workq.empty()){
1249 int node = workq.front();
1251 hfta_topsort.push_back(node);
1252 set<int>::iterator stsi;
1253 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1254 int parent = (*stsi);
1255 hnode_srcs[parent]++;
1256 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1257 workq.push_back(parent);
1262 // Decorate hfta nodes with the level of parallelism given as input.
1264 map<string, int>::iterator msii;
1265 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1266 string hfta_name = (*msii).first;
1267 int par = (*msii).second;
1268 if(hfta_name_map.count(hfta_name) > 0){
1269 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1271 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1275 // Propagate levels of parallelism: children should have a level of parallelism
1276 // as large as any of its parents. Adjust children upwards to compensate.
1277 // Start at parents and adjust children, auto-propagation will occur.
1279 for(i=hfta_sets.size()-1;i>=0;i--){
1280 set<int>::iterator stsi;
1281 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1282 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1283 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1288 // Before all the name mangling, check if therey are any output_spec.cfg
1289 // or hfta_parallelism.cfg entries that do not have a matching query.
1291 string dangling_ospecs = "";
1292 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1293 string oq = (*msii).first;
1294 if(hfta_name_map.count(oq) == 0){
1295 dangling_ospecs += " "+(*msii).first;
1298 if(dangling_ospecs!=""){
1299 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1302 string dangling_par = "";
1303 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1304 string oq = (*msii).first;
1305 if(hfta_name_map.count(oq) == 0){
1306 dangling_par += " "+(*msii).first;
1309 if(dangling_par!=""){
1310 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1315 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1316 // FROM clauses: retarget any name which is an internal node, and
1317 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1318 // when the source hfta has more parallelism than the target node.
1319 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1322 int n_original_hfta_sets = hfta_sets.size();
1323 for(i=0;i<n_original_hfta_sets;++i){
1324 if(hfta_sets[i]->n_parallel > 1){
1325 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1326 set<string> local_nodes; // names of query nodes in the hfta.
1327 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1328 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1331 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1332 string mangler = "__copy"+int_to_string(p);
1333 hfta_node *par_hfta = new hfta_node();
1334 par_hfta->name = hfta_sets[i]->name + mangler;
1335 par_hfta->source_name = hfta_sets[i]->name;
1336 par_hfta->is_udop = hfta_sets[i]->is_udop;
1337 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1338 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1339 par_hfta->parallel_idx = p;
1341 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1344 if(hfta_sets[i]->is_udop){
1345 int root = hfta_sets[i]->query_node_indices[0];
1347 string unequal_par_sources;
1348 set<int>::iterator rfsii;
1349 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1350 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1351 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1354 if(unequal_par_sources != ""){
1355 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1360 vector<string> new_sources;
1361 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1362 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1365 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1366 new_qn->name += mangler;
1367 new_qn->mangler = mangler;
1368 new_qn->refd_tbls = new_sources;
1369 par_hfta->query_node_indices.push_back(qnodes.size());
1370 par_qnode_map[new_qn->name] = qnodes.size();
1371 name_node_map[ new_qn->name ] = qnodes.size();
1372 qnodes.push_back(new_qn);
1374 // regular query node
1375 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1376 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1377 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1378 // rehome the from clause on mangled names.
1379 // create merge nodes as needed for external sources.
1380 for(f=0;f<dup_pt->fm->tlist.size();++f){
1381 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1382 dup_pt->fm->tlist[f]->schema_name += mangler;
1383 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1384 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1385 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1386 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1387 dup_pt->fm->tlist[f]->schema_name += mangler;
1389 vector<string> src_tbls;
1390 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1392 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1395 for(s=0;s<stride;++s){
1396 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1397 src_tbls.push_back(ext_src_name);
1399 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1400 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1401 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1402 // Make a qnode to represent the new merge node
1403 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1404 qn_pt->refd_tbls = src_tbls;
1405 qn_pt->is_udop = false;
1406 qn_pt->is_externally_visible = false;
1407 qn_pt->inferred_visible_node = false;
1408 par_hfta->query_node_indices.push_back(qnodes.size());
1409 par_qnode_map[merge_node_name] = qnodes.size();
1410 name_node_map[ merge_node_name ] = qnodes.size();
1411 qnodes.push_back(qn_pt);
1415 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1416 for(f=0;f<dup_pt->fm->tlist.size();++f){
1417 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1419 new_qn->params = qnodes[hqn_idx]->params;
1420 new_qn->is_udop = false;
1421 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1422 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1423 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1424 par_qnode_map[new_qn->name] = qnodes.size();
1425 name_node_map[ new_qn->name ] = qnodes.size();
1426 qnodes.push_back(new_qn);
1429 hfta_name_map[par_hfta->name] = hfta_sets.size();
1430 hfta_sets.push_back(par_hfta);
1433 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1435 if(!hfta_sets[i]->is_udop){
1436 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1437 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1438 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1439 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1440 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1441 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1442 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1443 vector<string> src_tbls;
1444 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1445 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1446 src_tbls.push_back(ext_src_name);
1448 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1449 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1450 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1451 // Make a qnode to represent the new merge node
1452 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1453 qn_pt->refd_tbls = src_tbls;
1454 qn_pt->is_udop = false;
1455 qn_pt->is_externally_visible = false;
1456 qn_pt->inferred_visible_node = false;
1457 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1458 name_node_map[ merge_node_name ] = qnodes.size();
1459 qnodes.push_back(qn_pt);
1468 // Rebuild the reads_from / sources_to lists in the qnodes
1469 for(q=0;q<qnodes.size();++q){
1470 qnodes[q]->reads_from.clear();
1471 qnodes[q]->sources_to.clear();
1473 for(q=0;q<qnodes.size();++q){
1474 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1475 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1476 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1477 qnodes[q]->reads_from.insert(rf);
1478 qnodes[rf]->sources_to.insert(q);
1483 // Rebuild the reads_from / sources_to lists in hfta_sets
1484 for(q=0;q<hfta_sets.size();++q){
1485 hfta_sets[q]->reads_from.clear();
1486 hfta_sets[q]->sources_to.clear();
1488 for(q=0;q<hfta_sets.size();++q){
1489 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1490 int node = hfta_sets[q]->query_node_indices[s];
1491 set<int>::iterator rfsii;
1492 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1493 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1494 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1495 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1502 for(q=0;q<qnodes.size();++q){
1503 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1504 set<int>::iterator rsii;
1505 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1506 printf(" %d",(*rsii));
1507 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1508 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1509 printf(" %d",(*rsii));
1513 for(q=0;q<hfta_sets.size();++q){
1514 if(hfta_sets[q]->do_generation==false)
1516 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1517 set<int>::iterator rsii;
1518 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1519 printf(" %d",(*rsii));
1520 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1521 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1522 printf(" %d",(*rsii));
1529 // Re-topo sort the hftas
1530 hfta_topsort.clear();
1532 int hnode_srcs_2[hfta_sets.size()];
1533 for(i=0;i<hfta_sets.size();++i){
1534 hnode_srcs_2[i] = 0;
1535 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1540 while(workq.empty() == false){
1541 int node = workq.front();
1543 hfta_topsort.push_back(node);
1544 set<int>::iterator stsii;
1545 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1546 int child = (*stsii);
1547 hnode_srcs_2[child]++;
1548 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1549 workq.push_back(child);
1554 // Ensure that all of the query_node_indices in hfta_sets are topologically
1555 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1556 for(i=0;i<hfta_sets.size();++i){
1557 if(hfta_sets[i]->do_generation){
1558 map<int,int> n_accounted;
1559 vector<int> new_order;
1561 vector<int>::iterator vii;
1562 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1563 n_accounted[(*vii)]= 0;
1565 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1566 set<int>::iterator rfsii;
1567 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1568 if(n_accounted.count((*rfsii)) == 0){
1569 n_accounted[(*vii)]++;
1572 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1573 workq.push_back((*vii));
1577 while(workq.empty() == false){
1578 int node = workq.front();
1580 new_order.push_back(node);
1581 set<int>::iterator stsii;
1582 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1583 if(n_accounted.count((*stsii))){
1584 n_accounted[(*stsii)]++;
1585 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1586 workq.push_back((*stsii));
1591 hfta_sets[i]->query_node_indices = new_order;
1599 /// Global checkng is done, start the analysis and translation
1600 /// of the query parse tree in the order specified by process_order
1603 // Get a list of the LFTAs for global lfta optimization
1604 // TODO: separate building operators from spliting lftas,
1605 // that will make optimizations such as predicate pushing easier.
1606 vector<stream_query *> lfta_list;
1608 stream_query *rootq;
1612 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1614 int hfta_id = hfta_topsort[qi];
1615 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1619 // Two possibilities, either its a UDOP, or its a collection of queries.
1620 // if(qnodes[curr_list.back()]->is_udop)
1621 if(hfta_sets[hfta_id]->is_udop){
1622 int node_id = curr_list.back();
1623 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1624 opview_entry *opv = new opview_entry();
1626 // Many of the UDOP properties aren't currently used.
1627 opv->parent_qname = "no_parent";
1628 opv->root_name = qnodes[node_id]->name;
1629 opv->view_name = qnodes[node_id]->file;
1631 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1632 opv->udop_alias = tmpstr;
1633 opv->mangler = qnodes[node_id]->mangler;
1635 if(opv->mangler != ""){
1636 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1637 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1640 // This piece of code makes each hfta which referes to the same udop
1641 // reference a distinct running udop. Do this at query optimization time?
1642 // fmtbl->set_udop_alias(opv->udop_alias);
1644 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1645 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1647 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1649 for(s=0;s<subq.size();++s){
1650 // Validate that the fields match.
1651 subquery_spec *sqs = subq[s];
1652 string subq_name = sqs->name + opv->mangler;
1653 vector<field_entry *> flds = Schema->get_fields(subq_name);
1654 if(flds.size() == 0){
1655 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1658 if(flds.size() < sqs->types.size()){
1659 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1662 bool failed = false;
1663 for(f=0;f<sqs->types.size();++f){
1664 data_type dte(sqs->types[f],sqs->modifiers[f]);
1665 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1666 if(! dte.subsumes_type(&dtf) ){
1667 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1671 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1672 string pstr = dte.get_temporal_string();
1673 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1680 /// Validation done, find the subquery, make a copy of the
1681 /// parse tree, and add it to the return list.
1682 for(q=0;q<qnodes.size();++q)
1683 if(qnodes[q]->name == subq_name)
1685 if(q==qnodes.size()){
1686 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1692 // Cross-link to from entry(s) in all sourced-to tables.
1693 set<int>::iterator sii;
1694 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1695 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1696 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1698 for(ii=0;ii<tblvars.size();++ii){
1699 if(tblvars[ii]->schema_name == opv->root_name){
1700 tblvars[ii]->set_opview_idx(opviews.size());
1706 opviews.append(opv);
1709 // Analyze the parse trees in this query,
1710 // put them in rootq
1711 // vector<int> curr_list = process_sets[qi];
1714 ////////////////////////////////////////
1717 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1718 for(qj=0;qj<curr_list.size();++qj){
1720 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1722 // Select the current query parse tree
1723 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1725 // if hfta only, try to fetch any missing schemas
1726 // from the registry (using the print_schema program).
1727 // Here I use a hack to avoid analyzing the query -- all referenced
1728 // tables must be in the from clause
1729 // If there is a problem loading any table, just issue a warning,
1731 tablevar_list_t *fm = fta_parse_tree->get_from();
1732 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1733 // iterate over all referenced tables
1735 for(t=0;t<refd_tbls.size();++t){
1736 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1738 if(tbl_ref < 0){ // if this table is not in the Schema
1741 string cmd="print_schema "+refd_tbls[t];
1742 FILE *schema_in = popen(cmd.c_str(), "r");
1743 if(schema_in == NULL){
1744 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1746 string schema_instr;
1747 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1748 schema_instr += tmpstr;
1750 fta_parse_result = new fta_parse_t();
1751 strcpy(tmp_schema_str,schema_instr.c_str());
1752 FtaParser_setstringinput(tmp_schema_str);
1753 if(FtaParserparse()){
1754 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1756 if( fta_parse_result->tables != NULL){
1758 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1759 Schema->add_table(fta_parse_result->tables->get_table(tl));
1762 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1767 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1775 // Analyze the query.
1776 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1778 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1782 stream_query new_sq(qs, Schema);
1783 if(new_sq.error_code){
1784 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1788 // Add it to the Schema
1789 table_def *output_td = new_sq.get_output_tabledef();
1790 Schema->add_table(output_td);
1792 // Create a query plan from the analyzed parse tree.
1793 // If its a query referneced via FROM, add it to the stream query.
1795 rootq->add_query(new_sq);
1797 rootq = new stream_query(new_sq);
1798 // have the stream query object inherit properties form the analyzed
1799 // hfta_node object.
1800 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1801 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1807 // This stream query has all its parts
1808 // Build and optimize it.
1809 //printf("translate_fta: generating plan.\n");
1810 if(rootq->generate_plan(Schema)){
1811 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1815 // If we've found the query plan head, so now add the output operators
1816 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1817 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1818 multimap<string, int>::iterator mmsi;
1819 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1820 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1821 rootq->add_output_operator(output_specs[(*mmsi).second]);
1827 // Perform query splitting if necessary.
1829 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1832 //for(l=0;l<split_queries.size();++l){
1833 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1839 if(split_queries.size() > 0){ // should be at least one component.
1841 // Compute the number of LFTAs.
1842 int n_lfta = split_queries.size();
1843 if(hfta_returned) n_lfta--;
1846 // Process the LFTA components.
1847 for(l=0;l<n_lfta;++l){
1848 if(lfta_names.count(split_queries[l]->query_name) == 0){
1849 // Grab the lfta for global optimization.
1850 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1851 string liface = tvec[0]->get_interface();
1852 string lmach = tvec[0]->get_machine();
1855 interface_names.push_back(liface);
1856 machine_names.push_back(lmach);
1857 //printf("Machine is %s\n",lmach.c_str());
1859 // Set the ht size from the recommendation, if there is one in the rec file
1860 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1861 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1865 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1866 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
1867 lfta_list.push_back(split_queries[l]);
1868 lfta_mach_lists[lmach].push_back(split_queries[l]);
1870 // THe following is a hack,
1871 // as I should be generating LFTA code through
1872 // the stream_query object.
1873 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
1874 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
1877 // Create query description to embed in lfta.c
1878 string lfta_schema_str = split_queries[l]->make_schema();
1879 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
1881 // get NIC capabilities.
1883 nic_property *nicprop = NULL;
1884 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
1885 if(iface_codegen_type.size()){
1886 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
1888 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
1893 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
1896 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
1897 query_names.push_back(split_queries[l]->query_name);
1898 mach_query_names[lmach].push_back(query_names.size()-1);
1899 // NOTE: I will assume a 1-1 correspondance between
1900 // mach_query_names[lmach] and lfta_mach_lists[lmach]
1901 // where mach_query_names[lmach][i] contains the index into
1902 // query_names, which names the lfta, and
1903 // mach_query_names[lmach][i] is the stream_query * of the
1904 // corresponding lfta.
1905 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
1909 // check if lfta is reusable
1910 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1912 bool lfta_reusable = false;
1913 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
1914 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
1915 lfta_reusable = true;
1917 lfta_reuse_options.push_back(lfta_reusable);
1919 // LFTA will inherit the liveness timeout specification from the containing query
1920 // it is too conservative as lfta are expected to spend less time per tuple
1923 // extract liveness timeout from query definition
1924 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
1925 if (!liveness_timeout) {
1926 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
1927 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
1928 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
1930 lfta_liveness_timeouts.push_back(liveness_timeout);
1932 // Add it to the schema
1933 table_def *td = split_queries[l]->get_output_tabledef();
1934 Schema->append_table(td);
1935 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
1940 // If the output is lfta-only, dump out the query name.
1941 if(split_queries.size() == 1 && !hfta_returned){
1942 if(output_query_names ){
1943 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
1947 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
1952 // output schema summary
1953 if(output_schema_summary){
1954 dump_summary(split_queries[0]);
1960 if(hfta_returned){ // query also has an HFTA component
1961 int hfta_nbr = split_queries.size()-1;
1963 hfta_list.push_back(split_queries[hfta_nbr]);
1965 // report on generated query names
1966 if(output_query_names){
1967 string hfta_name =split_queries[hfta_nbr]->query_name;
1968 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
1969 for(l=0;l<hfta_nbr;++l){
1970 string lfta_name =split_queries[l]->query_name;
1971 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
1975 // fprintf(stderr,"query names are ");
1976 // for(l=0;l<hfta_nbr;++l){
1977 // if(l>0) fprintf(stderr,",");
1978 // string fta_name =split_queries[l]->query_name;
1979 // fprintf(stderr," %s",fta_name.c_str());
1981 // fprintf(stderr,"\n");
1986 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
1987 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
1994 //-----------------------------------------------------------------
1995 // Compute and propagate the SE in PROTOCOL fields compute a field.
1996 //-----------------------------------------------------------------
1998 for(i=0;i<lfta_list.size();i++){
1999 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2000 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2002 for(i=0;i<hfta_list.size();i++){
2003 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2004 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2009 //------------------------------------------------------------------------
2010 // Perform individual FTA optimizations
2011 //-----------------------------------------------------------------------
2013 if (partitioned_mode) {
2015 // open partition definition file
2016 string part_fname = config_dir_path + "partition.txt";
2018 FILE* partfd = fopen(part_fname.c_str(), "r");
2020 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2023 PartnParser_setfileinput(partfd);
2024 if (PartnParserparse()) {
2025 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2032 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2034 int num_hfta = hfta_list.size();
2035 for(i=0; i < hfta_list.size(); ++i){
2036 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2039 // Add all new hftas to schema
2040 for(i=num_hfta; i < hfta_list.size(); ++i){
2041 table_def *td = hfta_list[i]->get_output_tabledef();
2042 Schema->append_table(td);
2045 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2049 //------------------------------------------------------------------------
2050 // Do global (cross-fta) optimization
2051 //-----------------------------------------------------------------------
2058 set<string> extra_external_libs;
2060 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2063 // build hfta file name, create output
2064 if(numeric_hfta_flname){
2065 sprintf(tmpstr,"hfta_%d",hfta_count);
2066 hfta_names.push_back(tmpstr);
2067 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2069 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2070 hfta_names.push_back(tmpstr);
2071 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2073 FILE *hfta_fl = fopen(tmpstr,"w");
2074 if(hfta_fl == NULL){
2075 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2078 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2080 // If there is a field verifier, warn about
2081 // lack of compatability
2082 // NOTE : this code assumes that visible non-lfta queries
2083 // are those at the root of a stream query.
2084 string hfta_comment;
2086 string hfta_namespace;
2087 if(hfta_list[i]->defines.count("comment")>0)
2088 hfta_comment = hfta_list[i]->defines["comment"];
2089 if(hfta_list[i]->defines.count("Comment")>0)
2090 hfta_comment = hfta_list[i]->defines["Comment"];
2091 if(hfta_list[i]->defines.count("COMMENT")>0)
2092 hfta_comment = hfta_list[i]->defines["COMMENT"];
2093 if(hfta_list[i]->defines.count("title")>0)
2094 hfta_title = hfta_list[i]->defines["title"];
2095 if(hfta_list[i]->defines.count("Title")>0)
2096 hfta_title = hfta_list[i]->defines["Title"];
2097 if(hfta_list[i]->defines.count("TITLE")>0)
2098 hfta_title = hfta_list[i]->defines["TITLE"];
2099 if(hfta_list[i]->defines.count("namespace")>0)
2100 hfta_namespace = hfta_list[i]->defines["namespace"];
2101 if(hfta_list[i]->defines.count("Namespace")>0)
2102 hfta_namespace = hfta_list[i]->defines["Namespace"];
2103 if(hfta_list[i]->defines.count("Namespace")>0)
2104 hfta_namespace = hfta_list[i]->defines["Namespace"];
2106 if(field_verifier != NULL){
2108 if(hfta_comment == "")
2109 warning_str += "\tcomment not found.\n";
2110 if(hfta_title == "")
2111 warning_str += "\ttitle not found.\n";
2112 if(hfta_namespace == "")
2113 warning_str += "\tnamespace not found.\n";
2115 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2117 for(fi=0;fi<flds.size();fi++){
2118 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2120 if(warning_str != "")
2121 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2122 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2125 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2126 if(hfta_comment != "")
2127 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2128 if(hfta_title != "")
2129 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2130 if(hfta_namespace != "")
2131 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2132 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2133 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2135 // write info about fields to qtree.xml
2136 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2138 for(fi=0;fi<flds.size();fi++){
2139 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2140 if(flds[fi]->get_modifier_list()->size()){
2141 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2143 fprintf(qtree_output," />\n");
2146 // extract liveness timeout from query definition
2147 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2148 if (!liveness_timeout) {
2149 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2150 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2151 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2153 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2155 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2157 for(itv=0;itv<tmp_tv.size();++itv){
2158 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2160 string ifrs = hfta_list[i]->collect_refd_ifaces();
2162 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2164 fprintf(qtree_output,"\t</HFTA>\n");
2168 // debug only -- do code generation to catch generation-time errors.
2169 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2172 hfta_count++; // for hfta file names with numeric suffixes
2174 hfta_list[i]->get_external_libs(extra_external_libs);
2178 string ext_lib_string;
2179 set<string>::iterator ssi_el;
2180 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2181 ext_lib_string += (*ssi_el)+" ";
2185 // Report on the set of operator views
2186 for(i=0;i<opviews.size();++i){
2187 opview_entry *opve = opviews.get_entry(i);
2188 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2189 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2190 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2191 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2192 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2194 if (!opve->liveness_timeout) {
2195 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2196 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2197 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2199 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2201 for(j=0;j<opve->subq_names.size();j++)
2202 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2203 fprintf(qtree_output,"\t</UDOP>\n");
2207 //-----------------------------------------------------------------
2209 // Create interface-specific meta code files.
2210 // first, open and parse the interface resources file.
2211 ifaces_db = new ifq_t();
2213 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2214 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2215 ifx_fname.c_str(), ierr.c_str());
2219 map<string, vector<stream_query *> >::iterator svsi;
2220 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2221 string lmach = (*svsi).first;
2223 // For this machine, create a set of lftas per interface.
2224 vector<stream_query *> mach_lftas = (*svsi).second;
2225 map<string, vector<stream_query *> > lfta_iface_lists;
2227 for(li=0;li<mach_lftas.size();++li){
2228 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2229 string lfta_iface = tvec[0]->get_interface();
2230 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2233 map<string, vector<stream_query *> >::iterator lsvsi;
2234 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2236 string liface = (*lsvsi).first;
2237 vector<stream_query *> iface_lftas = (*lsvsi).second;
2238 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2239 if(iface_codegen_type.size()){
2240 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2242 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2245 string mcs = generate_nic_code(iface_lftas, nicprop);
2248 mcf_flnm = lmach + "_"+liface+".mcf";
2250 mcf_flnm = hostname + "_"+liface+".mcf";
2252 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2253 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2256 fprintf(mcf_fl,"%s",mcs.c_str());
2258 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2259 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2268 //-----------------------------------------------------------------
2271 // Find common filter predicates in the LFTAs.
2272 // in addition generate structs to store the temporal attributes unpacked by prefilter
2274 map<string, vector<stream_query *> >::iterator ssqi;
2275 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2277 string lmach = (*ssqi).first;
2278 bool packed_return = false;
2282 // The LFTAs of this machine.
2283 vector<stream_query *> mach_lftas = (*ssqi).second;
2284 // break up on a per-interface basis.
2285 map<string, vector<stream_query *> > lfta_iface_lists;
2286 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2288 for(li=0;li<mach_lftas.size();++li){
2289 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2290 string lfta_iface = tvec[0]->get_interface();
2291 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2292 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2296 // Are the return values "packed"?
2297 // This should be done on a per-interface basis.
2298 // But this is defunct code for gs-lite
2299 for(li=0;li<mach_lftas.size();++li){
2300 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2301 string liface = tvec[0]->get_interface();
2302 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2303 if(iface_codegen_type.size()){
2304 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2305 packed_return = true;
2311 // Separate lftas by interface, collect results on a per-interface basis.
2313 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2314 map<string, vector<cnf_set *> > prefilter_preds;
2315 set<unsigned int> pred_ids; // this can be global for all interfaces
2316 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2317 string liface = (*mvsi).first;
2318 vector<cnf_set *> empty_list;
2319 prefilter_preds[liface] = empty_list;
2320 if(! packed_return){
2321 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2324 // get NIC capabilities. (Is this needed?)
2325 nic_property *nicprop = NULL;
2326 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2327 if(iface_codegen_type.size()){
2328 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2330 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2337 // Now that we know the prefilter preds, generate the lfta code.
2338 // Do this for all lftas in this machine.
2339 for(li=0;li<mach_lftas.size();++li){
2340 set<unsigned int> subsumed_preds;
2341 set<unsigned int>::iterator sii;
2343 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2345 if((pid>>16) == li){
2346 subsumed_preds.insert(pid & 0xffff);
2350 string lfta_schema_str = mach_lftas[li]->make_schema();
2351 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2352 nic_property *nicprop = NULL; // no NIC properties?
2353 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2357 // generate structs to store the temporal attributes
2358 // unpacked by prefilter
2359 col_id_set temp_cids;
2360 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2361 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2363 // Compute the lfta bit signatures and the lfta colrefs
2364 // do this on a per-interface basis
2366 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2368 map<string, vector<long long int> > lfta_sigs; // used again later
2369 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2370 string liface = (*mvsi).first;
2371 vector<long long int> empty_list;
2372 lfta_sigs[liface] = empty_list;
2374 vector<col_id_set> lfta_cols;
2375 vector<int> lfta_snap_length;
2376 for(li=0;li<lfta_iface_lists[liface].size();++li){
2377 unsigned long long int mask=0, bpos=1;
2379 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2380 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2384 lfta_sigs[liface].push_back(mask);
2385 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2386 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2389 //for(li=0;li<mach_lftas.size();++li){
2390 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2391 //col_id_set::iterator tcisi;
2392 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2393 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2398 // generate the prefilter
2399 // Do this on a per-interface basis, except for the #define
2401 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2402 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2404 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2409 // Generate interface parameter lookup function
2410 lfta_val[lmach] += "// lookup interface properties by name\n";
2411 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2412 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2413 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2415 // collect a lit of interface names used by queries running on this host
2416 set<std::string> iface_names;
2417 for(i=0;i<mach_query_names[lmach].size();i++){
2418 int mi = mach_query_names[lmach][i];
2419 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2421 if(interface_names[mi]=="")
2422 iface_names.insert("DEFAULTDEV");
2424 iface_names.insert(interface_names[mi]);
2427 // generate interface property lookup code for every interface
2428 set<std::string>::iterator sir;
2429 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2430 if (sir == iface_names.begin())
2431 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2433 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2435 // iterate through interface properties
2436 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2438 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2441 if (iface_properties.empty())
2442 lfta_val[lmach] += "\t\treturn NULL;\n";
2444 for (int i = 0; i < iface_properties.size(); ++i) {
2446 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2448 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2450 // combine all values for the interface property using comma separator
2451 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2452 for (int j = 0; j < vals.size(); ++j) {
2453 lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
2454 if (j != vals.size()-1)
2455 lfta_val[lmach] += ",";
2457 lfta_val[lmach] += "\";\n";
2459 lfta_val[lmach] += "\t\t} else\n";
2460 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2463 lfta_val[lmach] += "\t} else\n";
2464 lfta_val[lmach] += "\t\treturn NULL;\n";
2465 lfta_val[lmach] += "}\n\n";
2468 // Generate a full list of FTAs for clearinghouse reference
2469 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2470 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2472 for (i = 0; i < query_names.size(); ++i) {
2474 lfta_val[lmach] += ", ";
2475 lfta_val[lmach] += "\"" + query_names[i] + "\"";
2477 for (i = 0; i < hfta_list.size(); ++i) {
2478 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2480 lfta_val[lmach] += ", NULL};\n\n";
2483 // Add the initialization function to lfta.c
2484 // Change to accept the interface name, and
2485 // set the prefilter function accordingly.
2486 // see the example in demo/err2
2487 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2489 // for(i=0;i<mach_query_names[lmach].size();i++)
2490 // int mi = mach_query_names[lmach][i];
2491 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2493 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2494 string liface = (*mvsi).first;
2495 vector<stream_query *> lfta_list = (*mvsi).second;
2496 for(i=0;i<lfta_list.size();i++){
2497 stream_query *lfta_sq = lfta_list[i];
2498 int mi = lfta_iface_qname_ix[liface][i];
2500 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2502 lfta_val[lmach] += "\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2503 if(interface_names[mi]=="")
2504 lfta_val[lmach]+="DEFAULTDEV";
2506 lfta_val[lmach]+='"'+interface_names[mi]+'"';
2508 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2509 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2511 sprintf(tmpstr,",%d",snap_lengths[mi]);
2512 lfta_val[lmach] += tmpstr;
2514 // unsigned long long int mask=0, bpos=1;
2516 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2517 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2519 // bpos = bpos << 1;
2523 // sprintf(tmpstr,",%lluull",mask);
2524 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2525 lfta_val[lmach]+=tmpstr;
2527 lfta_val[lmach] += ",0ull";
2530 lfta_val[lmach] += ");\n";
2534 // End of lfta prefilter stuff
2535 // --------------------------------------------------
2537 // If there is a field verifier, warn about
2538 // lack of compatability
2539 string lfta_comment;
2541 string lfta_namespace;
2542 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2543 if(ldefs.count("comment")>0)
2544 lfta_comment = lfta_sq->defines["comment"];
2545 if(ldefs.count("Comment")>0)
2546 lfta_comment = lfta_sq->defines["Comment"];
2547 if(ldefs.count("COMMENT")>0)
2548 lfta_comment = lfta_sq->defines["COMMENT"];
2549 if(ldefs.count("title")>0)
2550 lfta_title = lfta_sq->defines["title"];
2551 if(ldefs.count("Title")>0)
2552 lfta_title = lfta_sq->defines["Title"];
2553 if(ldefs.count("TITLE")>0)
2554 lfta_title = lfta_sq->defines["TITLE"];
2555 if(ldefs.count("NAMESPACE")>0)
2556 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2557 if(ldefs.count("Namespace")>0)
2558 lfta_namespace = lfta_sq->defines["Namespace"];
2559 if(ldefs.count("namespace")>0)
2560 lfta_namespace = lfta_sq->defines["namespace"];
2562 string lfta_ht_size;
2563 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2564 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2565 if(ldefs.count("aggregate_slots")>0){
2566 lfta_ht_size = ldefs["aggregate_slots"];
2569 // NOTE : I'm assuming that visible lftas do not start with _fta.
2570 // -- will fail for non-visible simple selection queries.
2571 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2573 if(lfta_comment == "")
2574 warning_str += "\tcomment not found.\n";
2575 if(lfta_title == "")
2576 warning_str += "\ttitle not found.\n";
2577 if(lfta_namespace == "")
2578 warning_str += "\tnamespace not found.\n";
2580 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2582 for(fi=0;fi<flds.size();fi++){
2583 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2585 if(warning_str != "")
2586 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2587 query_names[mi].c_str(),warning_str.c_str());
2591 // Create qtree output
2592 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2593 if(lfta_comment != "")
2594 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2595 if(lfta_title != "")
2596 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2597 if(lfta_namespace != "")
2598 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2599 if(lfta_ht_size != "")
2600 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2602 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2604 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2605 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2606 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2607 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2608 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2609 // write info about fields to qtree.xml
2610 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2612 for(fi=0;fi<flds.size();fi++){
2613 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2614 if(flds[fi]->get_modifier_list()->size()){
2615 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2617 fprintf(qtree_output," />\n");
2619 fprintf(qtree_output,"\t</LFTA>\n");
2625 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2626 string liface = (*mvsi).first;
2628 " if (!strcmp(device, \""+liface+"\")) \n"
2629 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2633 " if(lfta_prefilter == NULL){\n"
2634 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2641 lfta_val[lmach] += "}\n\n";
2643 if(!(debug_only || hfta_only) ){
2646 lfta_flnm = lmach + "_lfta.c";
2648 lfta_flnm = hostname + "_lfta.c";
2649 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2650 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2653 fprintf(lfta_out,"%s",lfta_header.c_str());
2654 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2655 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2660 // Say what are the operators which must execute
2661 if(opviews.size()>0)
2662 fprintf(stderr,"The queries use the following external operators:\n");
2663 for(i=0;i<opviews.size();++i){
2664 opview_entry *opv = opviews.get_entry(i);
2665 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2669 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2670 machine_names, schema_file_name,
2672 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2675 fprintf(qtree_output,"</QueryNodes>\n");
2680 ////////////////////////////////////////////////////////////
2682 void generate_makefile(vector<string> &input_file_names, int nfiles,
2683 vector<string> &hfta_names, opview_set &opviews,
2684 vector<string> &machine_names,
2685 string schema_file_name,
2686 vector<string> &interface_names,
2687 ifq_t *ifdb, string &config_dir_path,
2690 map<string, vector<int> > &rts_hload
2694 if(config_dir_path != ""){
2695 config_dir_path = "-C "+config_dir_path;
2699 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2700 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2702 // if(libz_exists && !libast_exists){
2703 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2707 // Get set of operator executable files to run
2709 set<string>::iterator ssi;
2710 for(i=0;i<opviews.size();++i){
2711 opview_entry *opv = opviews.get_entry(i);
2712 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2715 FILE *outfl = fopen("Makefile", "w");
2717 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2722 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
2723 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2727 fprintf(outfl," -DLFTA_STATS");
2733 for(i=0;i<hfta_names.size();++i)
2734 fprintf(outfl," %s",hfta_names[i].c_str());
2738 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
2739 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2741 fprintf(outfl,"-L. ");
2743 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2745 fprintf(outfl,"-lgscppads -lpads ");
2747 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2749 fprintf(outfl, "-lpz -lz -lbz ");
2750 if(libz_exists && libast_exists)
2751 fprintf(outfl,"-last ");
2753 fprintf(outfl, "-ldll -ldl ");
2754 fprintf(outfl," -lgscpaux");
2756 fprintf(outfl," -fprofile-arcs");
2761 "lfta.o: %s_lfta.c\n"
2762 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
2764 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
2765 for(i=0;i<nfiles;++i)
2766 fprintf(outfl," %s",input_file_names[i].c_str());
2768 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2770 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2772 for(i=0;i<nfiles;++i)
2773 fprintf(outfl," %s",input_file_names[i].c_str());
2774 fprintf(outfl,"\n");
2776 for(i=0;i<hfta_names.size();++i)
2779 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
2782 "\t$(CPP) -o %s.o -c %s.cc\n"
2785 hfta_names[i].c_str(), hfta_names[i].c_str(),
2786 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
2787 hfta_names[i].c_str(), hfta_names[i].c_str(),
2788 hfta_names[i].c_str(), hfta_names[i].c_str()
2793 "packet_schema.txt:\n"
2794 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
2796 "external_fcns.def:\n"
2797 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
2800 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
2801 for(i=0;i<hfta_names.size();++i)
2802 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
2803 fprintf(outfl,"\n");
2809 // Gather the set of interfaces
2810 // TODO : must update to hanndle machines
2811 // TODO : lookup interface attributes and add them as a parameter to rts process
2812 outfl = fopen("runit", "w");
2814 fprintf(stderr,"Can't open runit for write, exiting.\n");
2818 // Gather the set of interfaces
2819 // Also, gather "base interface names" for use in computing
2820 // the hash splitting to virtual interfaces.
2821 // TODO : must update to hanndle machines
2823 set<string> base_vifaces; // base interfaces of virtual interfaces
2824 map<string, string> ifmachines;
2825 map<string, string> ifattrs;
2826 for(i=0;i<interface_names.size();++i){
2827 ifaces.insert(interface_names[i]);
2828 ifmachines[interface_names[i]] = machine_names[i];
2830 size_t Xpos = interface_names[i].find_last_of("X");
2831 if(Xpos!=string::npos){
2832 string iface = interface_names[i].substr(0,Xpos);
2833 base_vifaces.insert(iface);
2835 // get interface attributes and add them to the list
2841 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
2843 "if [ ! -f gshub.log ]\n"
2845 "\techo \"Failed to start bin/gshub.py\"\n"
2848 "ADDR=`cat gshub.log`\n"
2849 "ps opgid= $! >> gs.pids\n"
2850 "./rts $ADDR default ").c_str(), outfl);
2853 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2854 string ifnm = (*ssi);
2855 fprintf(outfl, "%s ",ifnm.c_str());
2856 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
2857 for(j=0;j<ifv.size();++j)
2858 fprintf(outfl, "%s ",ifv[j].c_str());
2860 fprintf(outfl, " &\n");
2861 fprintf(outfl, "echo $! >> gs.pids\n");
2862 for(i=0;i<hfta_names.size();++i)
2863 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
2865 for(j=0;j<opviews.opview_list.size();++j){
2866 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
2870 system("chmod +x runit");
2872 outfl = fopen("stopit", "w");
2874 fprintf(stderr,"Can't open stopit for write, exiting.\n");
2878 fprintf(outfl,"#!/bin/sh\n"
2880 "if [ ! -f gs.pids ]\n"
2884 "for pgid in `cat gs.pids`\n"
2886 "kill -TERM -$pgid\n"
2889 "for pgid in `cat gs.pids`\n"
2896 system("chmod +x stopit");
2898 //-----------------------------------------------
2900 /* For now disable support for virtual interfaces
2901 outfl = fopen("set_vinterface_hash.bat", "w");
2903 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
2907 // The format should be determined by an entry in the ifres.xml file,
2908 // but for now hardcode the only example I have.
2909 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
2910 if(rts_hload.count((*ssi))){
2911 string iface_name = (*ssi);
2912 string iface_number = "";
2913 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
2914 if(isdigit(iface_name[j])){
2915 iface_number = iface_name[j];
2916 if(j>0 && isdigit(iface_name[j-1]))
2917 iface_number = iface_name[j-1] + iface_number;
2921 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
2922 vector<int> halloc = rts_hload[iface_name];
2924 for(j=0;j<halloc.size();++j){
2927 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
2928 prev_limit = halloc[j];
2930 fprintf(outfl,"\n");
2934 system("chmod +x set_vinterface_hash.bat");
2938 // Code for implementing a local schema
2940 table_list qpSchema;
2942 // Load the schemas of any LFTAs.
2944 for(l=0;l<hfta_nbr;++l){
2945 stream_query *sq0 = split_queries[l];
2946 table_def *td = sq0->get_output_tabledef();
2947 qpSchema.append_table(td);
2949 // load the schemas of any other ref'd tables.
2951 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
2953 for(ti=0;ti<input_tbl_names.size();++ti){
2954 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
2956 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
2958 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
2961 qpSchema.append_table(Schema->get_table(tbl_ref));
2966 // Functions related to parsing.
2969 static int split_string(char *instr,char sep, char **words,int max_words){
2975 words[nwords++] = str;
2976 while( (loc = strchr(str,sep)) != NULL){
2979 if(nwords >= max_words){
2980 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
2981 nwords = max_words-1;
2983 words[nwords++] = str;