1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
67 // Interface to the field list verifier
68 field_list *field_verifier = NULL;
70 #define TMPSTRLEN 1000
73 #define PATH_DELIM '/'
76 char tmp_schema_str[10000];
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
83 // Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
86 // Interface to FTA definition lexer and parser ...
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
97 // Interface to external function lexer and parser ...
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
103 ext_fcn_list *Ext_fcns;
106 // Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
116 // forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118 vector<string> &hfta_names, opview_set &opviews,
119 vector<string> &machine_names,
120 string schema_file_name,
121 vector<string> &interface_names,
122 ifq_t *ifdb, string &config_dir_path,
125 map<string, vector<int> > &rts_hload
128 //static int split_string(char *instr,char sep, char **words,int max_words);
131 FILE *schema_summary_output = NULL; // query names
133 // Dump schema summary
134 void dump_summary(stream_query *str){
135 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
137 table_def *sch = str->get_output_tabledef();
139 vector<field_entry *> flds = sch->get_fields();
141 for(f=0;f<flds.size();++f){
142 if(f>0) fprintf(schema_summary_output,"|");
143 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
145 fprintf(schema_summary_output,"\n");
146 for(f=0;f<flds.size();++f){
147 if(f>0) fprintf(schema_summary_output,"|");
148 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
150 fprintf(schema_summary_output,"\n");
154 string hostname; // name of current host.
156 bool generate_stats = false;
157 string root_path = "../..";
160 int main(int argc, char **argv){
161 char tmpstr[TMPSTRLEN];
165 set<int>::iterator si;
167 vector<string> query_names; // for lfta.c registration
168 map<string, vector<int> > mach_query_names; // list queries of machine
169 vector<int> snap_lengths; // for lfta.c registration
170 vector<string> interface_names; // for lfta.c registration
171 vector<string> machine_names; // machine of interface
172 vector<bool> lfta_reuse_options; // for lfta.c registration
173 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
174 vector<string> hfta_names; // hfta cource code names, for
175 // creating make file.
176 vector<string> qnames; // ensure unique names
177 map<string, int> lfta_names; // keep track of unique lftas.
180 // set these to 1 to debug the parser
182 Ext_fcnsParserdebug = 0;
184 FILE *lfta_out; // lfta.c output.
185 FILE *fta_in; // input file
186 FILE *table_schemas_in; // source tables definition file
187 FILE *query_name_output; // query names
188 FILE *qtree_output; // interconnections of query nodes
190 // -------------------------------
191 // Handling of Input Arguments
192 // -------------------------------
193 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195 "\t[-B] : debug only (don't create output files)\n"
196 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199 "\t[-C] : use <config directory> for definition files\n"
200 "\t[-l] : use <library directory> for library queries\n"
201 "\t[-N] : output query names in query_names.txt\n"
202 "\t[-H] : create HFTA only (no schema_file)\n"
203 "\t[-Q] : use query name for hfta suffix\n"
204 "\t[-M] : generate make file and runit, stopit scripts\n"
205 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206 "\t[-f] : Output schema summary to schema_summary.txt\n"
207 "\t[-P] : link with PADS\n"
208 "\t[-h] : override host name.\n"
209 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210 "\t[-R] : path to root of GS-lite\n"
213 // parameters gathered from command line processing
214 string external_fcns_path;
215 // string internal_fcn_path;
216 string config_dir_path;
217 string library_path = "./";
218 vector<string> input_file_names;
219 string schema_file_name;
220 bool debug_only = false;
221 bool hfta_only = false;
222 bool output_query_names = false;
223 bool output_schema_summary=false;
224 bool numeric_hfta_flname = true;
225 bool create_makefile = false;
226 bool distributed_mode = false;
227 bool partitioned_mode = false;
228 bool use_live_hosts_file = false;
229 bool use_pads = false;
230 bool clean_make = false;
231 int n_virtual_interfaces = 1;
234 while((chopt = getopt(argc,argv,optstr)) != -1){
240 distributed_mode = true;
243 partitioned_mode = true;
246 use_live_hosts_file = true;
250 config_dir_path = string(optarg) + string("/");
254 library_path = string(optarg) + string("/");
257 output_query_names = true;
260 numeric_hfta_flname = false;
263 if(schema_file_name == ""){
268 output_schema_summary=true;
271 create_makefile=true;
292 n_virtual_interfaces = atoi(optarg);
293 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295 n_virtual_interfaces = 1;
300 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301 fprintf(stderr,"%s\n", usage_str);
304 fprintf(stderr, "Argument was %c\n", optopt);
305 fprintf(stderr,"Invalid arguments\n");
306 fprintf(stderr,"%s\n", usage_str);
312 for (int i = 0; i < argc; ++i) {
313 if((schema_file_name == "") && !hfta_only){
314 schema_file_name = argv[i];
316 input_file_names.push_back(argv[i]);
320 if(input_file_names.size() == 0){
321 fprintf(stderr,"%s\n", usage_str);
326 string clean_cmd = "rm Makefile hfta_*.cc";
327 int clean_ret = system(clean_cmd.c_str());
329 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
334 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
336 // Open globally used file names.
338 // prepend config directory to schema file
339 schema_file_name = config_dir_path + schema_file_name;
340 external_fcns_path = config_dir_path + string("external_fcns.def");
341 string ifx_fname = config_dir_path + string("ifres.xml");
343 // Find interface query file(s).
345 gethostname(tmpstr,TMPSTRLEN);
348 hostname_len = strlen(tmpstr);
349 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350 vector<string> ifq_fls;
352 ifq_fls.push_back(ifq_fname);
355 // Get the field list, if it exists
356 string flist_fl = config_dir_path + "field_list.xml";
358 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360 xml_leaves = new xml_t();
361 xmlParser_setfileinput(flf_in);
362 if(xmlParserparse()){
363 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
365 field_verifier = new field_list(xml_leaves);
370 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
377 if(!(debug_only || hfta_only)){
378 if((lfta_out = fopen("lfta.c","w")) == NULL){
379 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
385 // Get the output specification file.
387 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388 string ospec_fl = "output_spec.cfg";
390 vector<ospec_str *> output_specs;
391 multimap<string, int> qname_to_ospec;
392 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
397 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
399 // make operator type lowercase
401 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402 *tmpc = tolower(*tmpc);
404 ospec_str *tmp_ospec = new ospec_str();
405 tmp_ospec->query = flds[0];
406 tmp_ospec->operator_type = flds[1];
407 tmp_ospec->operator_param = flds[2];
408 tmp_ospec->output_directory = flds[3];
409 tmp_ospec->bucketwidth = atoi(flds[4]);
410 tmp_ospec->partitioning_flds = flds[5];
411 tmp_ospec->n_partitions = atoi(flds[6]);
412 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413 output_specs.push_back(tmp_ospec);
415 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
425 string pspec_fl = "hfta_parallelism.cfg";
427 map<string, int> hfta_parallelism;
428 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432 bool good_entry = true;
434 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
436 string hname = flds[0];
437 int par = atoi(flds[1]);
438 if(par <= 0 || par > n_virtual_interfaces){
439 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442 if(good_entry && n_virtual_interfaces % par != 0){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 hfta_parallelism[hname] = par;
451 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
455 // LFTA hash table sizes
456 string htspec_fl = "lfta_htsize.cfg";
457 FILE *htsp_in = NULL;
458 map<string, int> lfta_htsize;
459 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463 bool good_entry = true;
465 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
467 string lfta_name = flds[0];
468 int htsz = atoi(flds[1]);
470 lfta_htsize[lfta_name] = htsz;
472 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480 // LFTA vitual interface hash split
481 string rtlspec_fl = "rts_load.cfg";
483 map<string, vector<int> > rts_hload;
484 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490 bool good_entry = true;
494 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
496 iface_name = flds[0];
499 for(j=1;j<nflds;++j){
500 int h = atoi(flds[j]);
504 hload.push_back(cumm_h);
510 rts_hload[iface_name] = hload;
512 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
519 if(output_query_names){
520 if((query_name_output = fopen("query_names.txt","w")) == NULL){
521 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526 if(output_schema_summary){
527 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533 if((qtree_output = fopen("qtree.xml","w")) == NULL){
534 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539 fprintf(qtree_output,"<QueryNodes>\n");
542 // Get an initial Schema
545 // Parse the table schema definitions.
546 fta_parse_result = new fta_parse_t();
547 FtaParser_setfileinput(table_schemas_in);
548 if(FtaParserparse()){
549 fprintf(stderr,"Table schema parse failed.\n");
552 if(fta_parse_result->parse_type != TABLE_PARSE){
553 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556 Schema = fta_parse_result->tables;
558 // Process schema field inheritance
560 retval = Schema->unroll_tables(err_str);
562 fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
566 // hfta only => we will try to fetch schemas from the registry.
567 // therefore, start off with an empty schema.
568 Schema = new table_list();
572 // Open and parse the external functions file.
573 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
574 if(Ext_fcnsParserin == NULL){
575 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
576 Ext_fcns = new ext_fcn_list();
578 if(Ext_fcnsParserparse()){
579 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
580 Ext_fcns = new ext_fcn_list();
583 if(Ext_fcns->validate_fcns(err_str)){
584 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
588 // Open and parse the interface resources file.
589 // ifq_t *ifaces_db = new ifq_t();
591 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
592 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
593 // ifx_fname.c_str(), ierr.c_str());
596 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
597 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
598 // ifq_fname.c_str(), ierr.c_str());
603 // The LFTA code string.
604 // Put the standard preamble here.
605 // NOTE: the hash macros, fcns should go into the run time
606 map<string, string> lfta_val;
607 map<string, string> lfta_prefilter_val;
610 "#include <limits.h>\n\n"
611 "#include \"rts.h\"\n"
612 "#include \"fta.h\"\n"
613 "#include \"lapp.h\"\n"
614 "#include \"rts_udaf.h\"\n\n"
616 // Get any locally defined parsing headers
618 memset(&glob_result, 0, sizeof(glob_result));
620 // do the glob operation
621 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
622 if(return_value == 0){
623 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
625 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
626 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
629 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
633 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
634 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
635 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
636 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
641 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
643 "#define SLOT_FILLED 0x04\n"
644 "#define SLOT_GEN_BITS 0x03\n"
645 "#define SLOT_HASH_BITS 0xfffffff8\n"
646 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
647 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
648 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
651 "#define lfta_BOOL_to_hash(x) (x)\n"
652 "#define lfta_USHORT_to_hash(x) (x)\n"
653 "#define lfta_UINT_to_hash(x) (x)\n"
654 "#define lfta_IP_to_hash(x) (x)\n"
655 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
656 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
657 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
658 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
659 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
660 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
661 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
662 " for(i=0;i<x.length;++i){\n"
663 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
669 " if((i%4)!=0) ret ^=tmp_sum;\n"
675 //////////////////////////////////////////////////////////////////
676 ///// Get all of the query parse trees
680 int hfta_count = 0; // for numeric suffixes to hfta .cc files
682 //---------------------------
683 // Global info needed for post processing.
685 // Set of operator views ref'd in the query set.
687 // lftas on a per-machine basis.
688 map<string, vector<stream_query *> > lfta_mach_lists;
689 int nfiles = input_file_names.size();
690 vector<stream_query *> hfta_list; // list of hftas.
691 map<string, stream_query *> sq_map; // map from query name to stream query.
694 //////////////////////////////////////////
696 // Open and parse the interface resources file.
697 ifq_t *ifaces_db = new ifq_t();
699 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
700 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
701 ifx_fname.c_str(), ierr.c_str());
704 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
705 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
706 ifq_fls[0].c_str(), ierr.c_str());
710 map<string, string> qname_to_flname; // for detecting duplicate query names
714 // Parse the files to create a vector of parse trees.
715 // Load qnodes with information to perform a topo sort
716 // based on query dependencies.
717 vector<query_node *> qnodes; // for topo sort.
718 map<string,int> name_node_map; // map query name to qnodes entry
719 for(i=0;i<input_file_names.size();i++){
721 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
722 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
725 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
727 // Parse the FTA query
728 fta_parse_result = new fta_parse_t();
729 FtaParser_setfileinput(fta_in);
730 if(FtaParserparse()){
731 fprintf(stderr,"FTA parse failed.\n");
734 if(fta_parse_result->parse_type != QUERY_PARSE){
735 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
739 // returns a list of parse trees
740 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
741 for(p=0;p<qlist.size();++p){
742 table_exp_t *fta_parse_tree = qlist[p];
743 // query_parse_trees.push_back(fta_parse_tree);
745 // compute the default name -- extract from query name
746 strcpy(tmpstr,input_file_names[i].c_str());
747 char *qname = strrchr(tmpstr,PATH_DELIM);
752 char *qname_end = strchr(qname,'.');
753 if(qname_end != NULL) *qname_end = '\0';
754 string qname_str = qname;
755 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
757 // Deternmine visibility. Should I be attaching all of the output methods?
758 if(qname_to_ospec.count(imputed_qname)>0)
759 fta_parse_tree->set_visible(true);
761 fta_parse_tree->set_visible(false);
764 // Create a manipulable repesentation of the parse tree.
765 // the qnode inherits the visibility assigned to the parse tree.
766 int pos = qnodes.size();
767 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
768 name_node_map[ qnodes[pos]->name ] = pos;
769 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
770 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
771 // qfiles.push_back(i);
773 // Check for duplicate query names
774 // NOTE : in hfta-only generation, I should
775 // also check with the names of the registered queries.
776 if(qname_to_flname.count(qnodes[pos]->name) > 0){
777 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
778 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
781 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
782 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
783 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
786 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
792 // Add the library queries
795 for(pos=0;pos<qnodes.size();++pos){
797 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
798 string src_tbl = qnodes[pos]->refd_tbls[fi];
799 if(qname_to_flname.count(src_tbl) == 0){
800 int last_sep = src_tbl.find_last_of('/');
801 if(last_sep != string::npos){
802 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
803 string target_qname = src_tbl.substr(last_sep+1);
804 string qpathname = library_path + src_tbl + ".gsql";
805 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
806 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
808 fprintf(stderr,"After exit\n");
810 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
811 // Parse the FTA query
812 fta_parse_result = new fta_parse_t();
813 FtaParser_setfileinput(fta_in);
814 if(FtaParserparse()){
815 fprintf(stderr,"FTA parse failed.\n");
818 if(fta_parse_result->parse_type != QUERY_PARSE){
819 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
823 map<string, int> local_query_map;
824 vector<string> local_query_names;
825 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
826 for(p=0;p<qlist.size();++p){
827 table_exp_t *fta_parse_tree = qlist[p];
828 fta_parse_tree->set_visible(false); // assumed to not produce output
829 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
830 if(imputed_qname == target_qname)
831 imputed_qname = src_tbl;
832 if(local_query_map.count(imputed_qname)>0){
833 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
836 local_query_map[ imputed_qname ] = p;
837 local_query_names.push_back(imputed_qname);
840 if(local_query_map.count(src_tbl)==0){
841 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
845 vector<int> worklist;
846 set<int> added_queries;
847 vector<query_node *> new_qnodes;
848 worklist.push_back(local_query_map[target_qname]);
849 added_queries.insert(local_query_map[target_qname]);
851 int qpos = qnodes.size();
852 for(qq=0;qq<worklist.size();++qq){
853 int q_id = worklist[qq];
854 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
855 new_qnodes.push_back( new_qnode);
856 vector<string> refd_tbls = new_qnode->refd_tbls;
858 for(ff = 0;ff<refd_tbls.size();++ff){
859 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
861 if(name_node_map.count(refd_tbls[ff])>0){
862 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
865 worklist.push_back(local_query_map[refd_tbls[ff]]);
871 for(qq=0;qq<new_qnodes.size();++qq){
872 int qpos = qnodes.size();
873 qnodes.push_back(new_qnodes[qq]);
874 name_node_map[qnodes[qpos]->name ] = qpos;
875 qname_to_flname[qnodes[qpos]->name ] = qpathname;
889 //---------------------------------------
894 string udop_missing_sources;
895 for(i=0;i<qnodes.size();++i){
897 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
898 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
900 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
901 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
902 int pos = qnodes.size();
903 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
904 name_node_map[ qnodes[pos]->name ] = pos;
905 qnodes[pos]->is_externally_visible = false; // its visible
906 // Need to mark the source queries as visible.
908 string missing_sources = "";
909 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
910 string src_tbl = qnodes[pos]->refd_tbls[si];
911 if(name_node_map.count(src_tbl)==0){
912 missing_sources += src_tbl + " ";
915 if(missing_sources != ""){
916 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
923 if(udop_missing_sources != ""){
924 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
930 ////////////////////////////////////////////////////////////////////
931 /// Check parse trees to verify that some
932 /// global properties are met :
933 /// if q1 reads from q2, then
934 /// q2 is processed before q1
935 /// q1 can supply q2's parameters
936 /// Verify there is no cycle in the reads-from graph.
938 // Compute an order in which to process the
941 // Start by building the reads-from lists.
944 for(i=0;i<qnodes.size();++i){
946 vector<string> refd_tbls = qnodes[i]->refd_tbls;
947 for(fi = 0;fi<refd_tbls.size();++fi){
948 if(name_node_map.count(refd_tbls[fi])>0){
949 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
950 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
956 // If one query reads the result of another,
957 // check for parameter compatibility. Currently it must
958 // be an exact match. I will move to requiring
959 // containment after re-ordering, but will require
960 // some analysis for code generation which is not
962 //printf("There are %d query nodes.\n",qnodes.size());
965 for(i=0;i<qnodes.size();++i){
966 vector<var_pair_t *> target_params = qnodes[i]->params;
967 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
968 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
969 if(target_params.size() != source_params.size()){
970 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
974 for(p=0;p<target_params.size();++p){
975 if(! (target_params[p]->name == source_params[p]->name &&
976 target_params[p]->val == source_params[p]->val ) ){
977 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
986 // Start by counting inedges.
987 for(i=0;i<qnodes.size();++i){
988 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
989 qnodes[(*si)]->n_consumers++;
993 // The roots are the nodes with indegree zero.
995 for(i=0;i<qnodes.size();++i){
996 if(qnodes[i]->n_consumers == 0){
997 if(qnodes[i]->is_externally_visible == false){
998 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1004 // Remove the parts of the subtree that produce no output.
1005 set<int> valid_roots;
1006 set<int> discarded_nodes;
1007 set<int> candidates;
1008 while(roots.size() >0){
1009 for(si=roots.begin();si!=roots.end();++si){
1010 if(qnodes[(*si)]->is_externally_visible){
1011 valid_roots.insert((*si));
1013 discarded_nodes.insert((*si));
1014 set<int>::iterator sir;
1015 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1016 qnodes[(*sir)]->n_consumers--;
1017 if(qnodes[(*sir)]->n_consumers == 0)
1018 candidates.insert( (*sir));
1025 roots = valid_roots;
1026 if(discarded_nodes.size()>0){
1027 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1029 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1030 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1032 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1034 fprintf(stderr,"\n");
1037 // Compute the sources_to set, ignoring discarded nodes.
1038 for(i=0;i<qnodes.size();++i){
1039 if(discarded_nodes.count(i)==0)
1040 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1041 qnodes[(*si)]->sources_to.insert(i);
1046 // Find the nodes that are shared by multiple visible subtrees.
1047 // THe roots become inferred visible nodes.
1049 // Find the visible nodes.
1050 vector<int> visible_nodes;
1051 for(i=0;i<qnodes.size();i++){
1052 if(qnodes[i]->is_externally_visible){
1053 visible_nodes.push_back(i);
1057 // Find UDOPs referenced by visible nodes.
1059 for(i=0;i<visible_nodes.size();++i){
1060 workq.push_back(visible_nodes[i]);
1062 while(!workq.empty()){
1063 int node = workq.front();
1065 set<int>::iterator children;
1066 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1067 qnodes[node]->is_externally_visible = true;
1068 visible_nodes.push_back(node);
1069 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1070 if(qnodes[(*children)]->is_externally_visible == false){
1071 qnodes[(*children)]->is_externally_visible = true;
1072 visible_nodes.push_back((*children));
1076 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1077 workq.push_back((*children));
1084 for(i=0;i<qnodes.size();i++){
1085 qnodes[i]->subtree_roots.clear();
1088 // Walk the tree defined by a visible node, not descending into
1089 // subtrees rooted by a visible node. Mark the node visited with
1090 // the visible node ID.
1091 for(i=0;i<visible_nodes.size();++i){
1093 vroots.insert(visible_nodes[i]);
1094 while(vroots.size()>0){
1095 for(si=vroots.begin();si!=vroots.end();++si){
1096 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1098 set<int>::iterator sir;
1099 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1100 if(! qnodes[(*sir)]->is_externally_visible){
1101 candidates.insert( (*sir));
1105 vroots = candidates;
1109 // Find the nodes in multiple visible node subtrees, but with no parent
1110 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1111 done = true; // until proven otherwise
1112 for(i=0;i<qnodes.size();i++){
1113 if(qnodes[i]->subtree_roots.size()>1){
1114 bool is_new_root = true;
1115 set<int>::iterator sir;
1116 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1117 if(qnodes[(*sir)]->subtree_roots.size()>1)
1118 is_new_root = false;
1121 qnodes[i]->is_externally_visible = true;
1122 qnodes[i]->inferred_visible_node = true;
1123 visible_nodes.push_back(i);
1134 // get visible nodes in topo ordering.
1135 // for(i=0;i<qnodes.size();i++){
1136 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1138 vector<int> process_order;
1139 while(roots.size() >0){
1140 for(si=roots.begin();si!=roots.end();++si){
1141 if(discarded_nodes.count((*si))==0){
1142 process_order.push_back( (*si) );
1144 set<int>::iterator sir;
1145 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1146 qnodes[(*sir)]->n_consumers--;
1147 if(qnodes[(*sir)]->n_consumers == 0)
1148 candidates.insert( (*sir));
1156 //printf("process_order.size() =%d\n",process_order.size());
1158 // Search for cyclic dependencies
1160 for(i=0;i<qnodes.size();++i){
1161 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1162 if(found_dep.size() != 0) found_dep += ", ";
1163 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1166 if(found_dep.size()>0){
1167 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1171 // Get a list of query sets, in the order to be processed.
1172 // Start at visible root and do bfs.
1173 // The query set includes queries referenced indirectly,
1174 // as sources for user-defined operators. These are needed
1175 // to ensure that they are added to the schema, but are not part
1176 // of the query tree.
1178 // stream_node_sets contains queries reachable only through the
1179 // FROM clause, so I can tell which queries to add to the stream
1180 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1182 // NOTE: this code works because in order for data to be
1183 // read by multiple hftas, the node must be externally visible.
1184 // But visible nodes define roots of process sets.
1185 // internally visible nodes can feed data only
1186 // to other nodes in the same query file.
1187 // Therefore, any access can be restricted to a file,
1188 // hfta output sharing is done only on roots
1189 // never on interior nodes.
1194 // Conpute the base collection of hftas.
1195 vector<hfta_node *> hfta_sets;
1196 map<string, int> hfta_name_map;
1197 // vector< vector<int> > process_sets;
1198 // vector< set<int> > stream_node_sets;
1199 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1200 // i.e. process leaves 1st.
1201 for(i=0;i<process_order.size();++i){
1202 if(qnodes[process_order[i]]->is_externally_visible == true){
1203 //printf("Visible.\n");
1204 int root = process_order[i];
1205 hfta_node *hnode = new hfta_node();
1206 hnode->name = qnodes[root]-> name;
1207 hnode->source_name = qnodes[root]-> name;
1208 hnode->is_udop = qnodes[root]->is_udop;
1209 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1211 vector<int> proc_list; proc_list.push_back(root);
1212 // Ensure that nodes are added only once.
1213 set<int> proc_set; proc_set.insert(root);
1214 roots.clear(); roots.insert(root);
1216 while(roots.size()>0){
1217 for(si=roots.begin();si!=roots.end();++si){
1218 //printf("Processing root %d\n",(*si));
1219 set<int>::iterator sir;
1220 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1221 //printf("reads fom %d\n",(*sir));
1222 if(qnodes[(*sir)]->is_externally_visible==false){
1223 candidates.insert( (*sir) );
1224 if(proc_set.count( (*sir) )==0){
1225 proc_set.insert( (*sir) );
1226 proc_list.push_back( (*sir) );
1235 reverse(proc_list.begin(), proc_list.end());
1236 hnode->query_node_indices = proc_list;
1237 hfta_name_map[hnode->name] = hfta_sets.size();
1238 hfta_sets.push_back(hnode);
1242 // Compute the reads_from / sources_to graphs for the hftas.
1244 for(i=0;i<hfta_sets.size();++i){
1245 hfta_node *hnode = hfta_sets[i];
1246 for(q=0;q<hnode->query_node_indices.size();q++){
1247 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1248 for(s=0;s<qnode->refd_tbls.size();++s){
1249 if(hfta_name_map.count(qnode->refd_tbls[s])){
1250 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1251 hnode->reads_from.insert(other_hfta);
1252 hfta_sets[other_hfta]->sources_to.insert(i);
1258 // Compute a topological sort of the hfta_sets.
1260 vector<int> hfta_topsort;
1262 int hnode_srcs[hfta_sets.size()];
1263 for(i=0;i<hfta_sets.size();++i){
1265 if(hfta_sets[i]->sources_to.size() == 0)
1269 while(! workq.empty()){
1270 int node = workq.front();
1272 hfta_topsort.push_back(node);
1273 set<int>::iterator stsi;
1274 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1275 int parent = (*stsi);
1276 hnode_srcs[parent]++;
1277 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1278 workq.push_back(parent);
1283 // Decorate hfta nodes with the level of parallelism given as input.
1285 map<string, int>::iterator msii;
1286 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1287 string hfta_name = (*msii).first;
1288 int par = (*msii).second;
1289 if(hfta_name_map.count(hfta_name) > 0){
1290 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1292 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1296 // Propagate levels of parallelism: children should have a level of parallelism
1297 // as large as any of its parents. Adjust children upwards to compensate.
1298 // Start at parents and adjust children, auto-propagation will occur.
1300 for(i=hfta_sets.size()-1;i>=0;i--){
1301 set<int>::iterator stsi;
1302 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1303 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1304 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1309 // Before all the name mangling, check if therey are any output_spec.cfg
1310 // or hfta_parallelism.cfg entries that do not have a matching query.
1312 string dangling_ospecs = "";
1313 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1314 string oq = (*msii).first;
1315 if(hfta_name_map.count(oq) == 0){
1316 dangling_ospecs += " "+(*msii).first;
1319 if(dangling_ospecs!=""){
1320 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1323 string dangling_par = "";
1324 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1325 string oq = (*msii).first;
1326 if(hfta_name_map.count(oq) == 0){
1327 dangling_par += " "+(*msii).first;
1330 if(dangling_par!=""){
1331 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1336 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1337 // FROM clauses: retarget any name which is an internal node, and
1338 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1339 // when the source hfta has more parallelism than the target node.
1340 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1343 int n_original_hfta_sets = hfta_sets.size();
1344 for(i=0;i<n_original_hfta_sets;++i){
1345 if(hfta_sets[i]->n_parallel > 1){
1346 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1347 set<string> local_nodes; // names of query nodes in the hfta.
1348 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1349 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1352 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1353 string mangler = "__copy"+int_to_string(p);
1354 hfta_node *par_hfta = new hfta_node();
1355 par_hfta->name = hfta_sets[i]->name + mangler;
1356 par_hfta->source_name = hfta_sets[i]->name;
1357 par_hfta->is_udop = hfta_sets[i]->is_udop;
1358 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1359 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1360 par_hfta->parallel_idx = p;
1362 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1365 if(hfta_sets[i]->is_udop){
1366 int root = hfta_sets[i]->query_node_indices[0];
1368 string unequal_par_sources;
1369 set<int>::iterator rfsii;
1370 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1371 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1372 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1375 if(unequal_par_sources != ""){
1376 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1381 vector<string> new_sources;
1382 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1383 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1386 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1387 new_qn->name += mangler;
1388 new_qn->mangler = mangler;
1389 new_qn->refd_tbls = new_sources;
1390 par_hfta->query_node_indices.push_back(qnodes.size());
1391 par_qnode_map[new_qn->name] = qnodes.size();
1392 name_node_map[ new_qn->name ] = qnodes.size();
1393 qnodes.push_back(new_qn);
1395 // regular query node
1396 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1397 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1398 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1399 // rehome the from clause on mangled names.
1400 // create merge nodes as needed for external sources.
1401 for(f=0;f<dup_pt->fm->tlist.size();++f){
1402 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1403 dup_pt->fm->tlist[f]->schema_name += mangler;
1404 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1405 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1406 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1407 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1408 dup_pt->fm->tlist[f]->schema_name += mangler;
1410 vector<string> src_tbls;
1411 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1413 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1416 for(s=0;s<stride;++s){
1417 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1418 src_tbls.push_back(ext_src_name);
1420 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1421 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1422 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1423 // Make a qnode to represent the new merge node
1424 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1425 qn_pt->refd_tbls = src_tbls;
1426 qn_pt->is_udop = false;
1427 qn_pt->is_externally_visible = false;
1428 qn_pt->inferred_visible_node = false;
1429 par_hfta->query_node_indices.push_back(qnodes.size());
1430 par_qnode_map[merge_node_name] = qnodes.size();
1431 name_node_map[ merge_node_name ] = qnodes.size();
1432 qnodes.push_back(qn_pt);
1436 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1437 for(f=0;f<dup_pt->fm->tlist.size();++f){
1438 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1440 new_qn->params = qnodes[hqn_idx]->params;
1441 new_qn->is_udop = false;
1442 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1443 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1444 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1445 par_qnode_map[new_qn->name] = qnodes.size();
1446 name_node_map[ new_qn->name ] = qnodes.size();
1447 qnodes.push_back(new_qn);
1450 hfta_name_map[par_hfta->name] = hfta_sets.size();
1451 hfta_sets.push_back(par_hfta);
1454 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1456 if(!hfta_sets[i]->is_udop){
1457 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1458 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1459 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1460 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1461 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1462 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1463 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1464 vector<string> src_tbls;
1465 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1466 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1467 src_tbls.push_back(ext_src_name);
1469 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1470 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1471 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1472 // Make a qnode to represent the new merge node
1473 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1474 qn_pt->refd_tbls = src_tbls;
1475 qn_pt->is_udop = false;
1476 qn_pt->is_externally_visible = false;
1477 qn_pt->inferred_visible_node = false;
1478 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1479 name_node_map[ merge_node_name ] = qnodes.size();
1480 qnodes.push_back(qn_pt);
1489 // Rebuild the reads_from / sources_to lists in the qnodes
1490 for(q=0;q<qnodes.size();++q){
1491 qnodes[q]->reads_from.clear();
1492 qnodes[q]->sources_to.clear();
1494 for(q=0;q<qnodes.size();++q){
1495 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1496 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1497 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1498 qnodes[q]->reads_from.insert(rf);
1499 qnodes[rf]->sources_to.insert(q);
1504 // Rebuild the reads_from / sources_to lists in hfta_sets
1505 for(q=0;q<hfta_sets.size();++q){
1506 hfta_sets[q]->reads_from.clear();
1507 hfta_sets[q]->sources_to.clear();
1509 for(q=0;q<hfta_sets.size();++q){
1510 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1511 int node = hfta_sets[q]->query_node_indices[s];
1512 set<int>::iterator rfsii;
1513 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1514 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1515 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1516 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1523 for(q=0;q<qnodes.size();++q){
1524 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1525 set<int>::iterator rsii;
1526 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1527 printf(" %d",(*rsii));
1528 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1529 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1530 printf(" %d",(*rsii));
1534 for(q=0;q<hfta_sets.size();++q){
1535 if(hfta_sets[q]->do_generation==false)
1537 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1538 set<int>::iterator rsii;
1539 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1540 printf(" %d",(*rsii));
1541 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1542 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1543 printf(" %d",(*rsii));
1550 // Re-topo sort the hftas
1551 hfta_topsort.clear();
1553 int hnode_srcs_2[hfta_sets.size()];
1554 for(i=0;i<hfta_sets.size();++i){
1555 hnode_srcs_2[i] = 0;
1556 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1561 while(workq.empty() == false){
1562 int node = workq.front();
1564 hfta_topsort.push_back(node);
1565 set<int>::iterator stsii;
1566 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1567 int child = (*stsii);
1568 hnode_srcs_2[child]++;
1569 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1570 workq.push_back(child);
1575 // Ensure that all of the query_node_indices in hfta_sets are topologically
1576 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1577 for(i=0;i<hfta_sets.size();++i){
1578 if(hfta_sets[i]->do_generation){
1579 map<int,int> n_accounted;
1580 vector<int> new_order;
1582 vector<int>::iterator vii;
1583 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1584 n_accounted[(*vii)]= 0;
1586 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1587 set<int>::iterator rfsii;
1588 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1589 if(n_accounted.count((*rfsii)) == 0){
1590 n_accounted[(*vii)]++;
1593 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1594 workq.push_back((*vii));
1598 while(workq.empty() == false){
1599 int node = workq.front();
1601 new_order.push_back(node);
1602 set<int>::iterator stsii;
1603 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1604 if(n_accounted.count((*stsii))){
1605 n_accounted[(*stsii)]++;
1606 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1607 workq.push_back((*stsii));
1612 hfta_sets[i]->query_node_indices = new_order;
1620 /// Global checkng is done, start the analysis and translation
1621 /// of the query parse tree in the order specified by process_order
1624 // Get a list of the LFTAs for global lfta optimization
1625 // TODO: separate building operators from spliting lftas,
1626 // that will make optimizations such as predicate pushing easier.
1627 vector<stream_query *> lfta_list;
1629 stream_query *rootq;
1633 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1635 int hfta_id = hfta_topsort[qi];
1636 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1640 // Two possibilities, either its a UDOP, or its a collection of queries.
1641 // if(qnodes[curr_list.back()]->is_udop)
1642 if(hfta_sets[hfta_id]->is_udop){
1643 int node_id = curr_list.back();
1644 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1645 opview_entry *opv = new opview_entry();
1647 // Many of the UDOP properties aren't currently used.
1648 opv->parent_qname = "no_parent";
1649 opv->root_name = qnodes[node_id]->name;
1650 opv->view_name = qnodes[node_id]->file;
1652 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1653 opv->udop_alias = tmpstr;
1654 opv->mangler = qnodes[node_id]->mangler;
1656 if(opv->mangler != ""){
1657 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1658 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1661 // This piece of code makes each hfta which referes to the same udop
1662 // reference a distinct running udop. Do this at query optimization time?
1663 // fmtbl->set_udop_alias(opv->udop_alias);
1665 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1666 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1668 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1670 for(s=0;s<subq.size();++s){
1671 // Validate that the fields match.
1672 subquery_spec *sqs = subq[s];
1673 string subq_name = sqs->name + opv->mangler;
1674 vector<field_entry *> flds = Schema->get_fields(subq_name);
1675 if(flds.size() == 0){
1676 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1679 if(flds.size() < sqs->types.size()){
1680 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1683 bool failed = false;
1684 for(f=0;f<sqs->types.size();++f){
1685 data_type dte(sqs->types[f],sqs->modifiers[f]);
1686 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1687 if(! dte.subsumes_type(&dtf) ){
1688 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1692 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1693 string pstr = dte.get_temporal_string();
1694 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1701 /// Validation done, find the subquery, make a copy of the
1702 /// parse tree, and add it to the return list.
1703 for(q=0;q<qnodes.size();++q)
1704 if(qnodes[q]->name == subq_name)
1706 if(q==qnodes.size()){
1707 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1713 // Cross-link to from entry(s) in all sourced-to tables.
1714 set<int>::iterator sii;
1715 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1716 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1717 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1719 for(ii=0;ii<tblvars.size();++ii){
1720 if(tblvars[ii]->schema_name == opv->root_name){
1721 tblvars[ii]->set_opview_idx(opviews.size());
1727 opviews.append(opv);
1730 // Analyze the parse trees in this query,
1731 // put them in rootq
1732 // vector<int> curr_list = process_sets[qi];
1735 ////////////////////////////////////////
1738 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1739 for(qj=0;qj<curr_list.size();++qj){
1741 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1743 // Select the current query parse tree
1744 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1746 // if hfta only, try to fetch any missing schemas
1747 // from the registry (using the print_schema program).
1748 // Here I use a hack to avoid analyzing the query -- all referenced
1749 // tables must be in the from clause
1750 // If there is a problem loading any table, just issue a warning,
1752 tablevar_list_t *fm = fta_parse_tree->get_from();
1753 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1754 // iterate over all referenced tables
1756 for(t=0;t<refd_tbls.size();++t){
1757 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1759 if(tbl_ref < 0){ // if this table is not in the Schema
1762 string cmd="print_schema "+refd_tbls[t];
1763 FILE *schema_in = popen(cmd.c_str(), "r");
1764 if(schema_in == NULL){
1765 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1767 string schema_instr;
1768 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1769 schema_instr += tmpstr;
1771 fta_parse_result = new fta_parse_t();
1772 strcpy(tmp_schema_str,schema_instr.c_str());
1773 FtaParser_setstringinput(tmp_schema_str);
1774 if(FtaParserparse()){
1775 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1777 if( fta_parse_result->tables != NULL){
1779 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1780 Schema->add_table(fta_parse_result->tables->get_table(tl));
1783 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1788 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1796 // Analyze the query.
1797 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1799 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1803 stream_query new_sq(qs, Schema);
1804 if(new_sq.error_code){
1805 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1809 // Add it to the Schema
1810 table_def *output_td = new_sq.get_output_tabledef();
1811 Schema->add_table(output_td);
1813 // Create a query plan from the analyzed parse tree.
1814 // If its a query referneced via FROM, add it to the stream query.
1816 rootq->add_query(new_sq);
1818 rootq = new stream_query(new_sq);
1819 // have the stream query object inherit properties form the analyzed
1820 // hfta_node object.
1821 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1822 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1828 // This stream query has all its parts
1829 // Build and optimize it.
1830 //printf("translate_fta: generating plan.\n");
1831 if(rootq->generate_plan(Schema)){
1832 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1836 // If we've found the query plan head, so now add the output operators
1837 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1838 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1839 multimap<string, int>::iterator mmsi;
1840 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1841 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1842 rootq->add_output_operator(output_specs[(*mmsi).second]);
1848 // Perform query splitting if necessary.
1850 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1853 //for(l=0;l<split_queries.size();++l){
1854 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1860 if(split_queries.size() > 0){ // should be at least one component.
1862 // Compute the number of LFTAs.
1863 int n_lfta = split_queries.size();
1864 if(hfta_returned) n_lfta--;
1867 // Process the LFTA components.
1868 for(l=0;l<n_lfta;++l){
1869 if(lfta_names.count(split_queries[l]->query_name) == 0){
1870 // Grab the lfta for global optimization.
1871 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1872 string liface = tvec[0]->get_interface();
1873 string lmach = tvec[0]->get_machine();
1876 interface_names.push_back(liface);
1877 machine_names.push_back(lmach);
1878 //printf("Machine is %s\n",lmach.c_str());
1880 // Set the ht size from the recommendation, if there is one in the rec file
1881 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1882 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1886 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1887 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
1888 lfta_list.push_back(split_queries[l]);
1889 lfta_mach_lists[lmach].push_back(split_queries[l]);
1891 // THe following is a hack,
1892 // as I should be generating LFTA code through
1893 // the stream_query object.
1894 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
1895 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
1898 // Create query description to embed in lfta.c
1899 string lfta_schema_str = split_queries[l]->make_schema();
1900 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
1902 // get NIC capabilities.
1904 nic_property *nicprop = NULL;
1905 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
1906 if(iface_codegen_type.size()){
1907 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
1909 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
1914 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
1917 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
1918 query_names.push_back(split_queries[l]->query_name);
1919 mach_query_names[lmach].push_back(query_names.size()-1);
1920 // NOTE: I will assume a 1-1 correspondance between
1921 // mach_query_names[lmach] and lfta_mach_lists[lmach]
1922 // where mach_query_names[lmach][i] contains the index into
1923 // query_names, which names the lfta, and
1924 // mach_query_names[lmach][i] is the stream_query * of the
1925 // corresponding lfta.
1926 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
1930 // check if lfta is reusable
1931 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1933 bool lfta_reusable = false;
1934 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
1935 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
1936 lfta_reusable = true;
1938 lfta_reuse_options.push_back(lfta_reusable);
1940 // LFTA will inherit the liveness timeout specification from the containing query
1941 // it is too conservative as lfta are expected to spend less time per tuple
1944 // extract liveness timeout from query definition
1945 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
1946 if (!liveness_timeout) {
1947 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
1948 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
1949 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
1951 lfta_liveness_timeouts.push_back(liveness_timeout);
1953 // Add it to the schema
1954 table_def *td = split_queries[l]->get_output_tabledef();
1955 Schema->append_table(td);
1956 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
1961 // If the output is lfta-only, dump out the query name.
1962 if(split_queries.size() == 1 && !hfta_returned){
1963 if(output_query_names ){
1964 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
1968 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
1973 // output schema summary
1974 if(output_schema_summary){
1975 dump_summary(split_queries[0]);
1981 if(hfta_returned){ // query also has an HFTA component
1982 int hfta_nbr = split_queries.size()-1;
1984 hfta_list.push_back(split_queries[hfta_nbr]);
1986 // report on generated query names
1987 if(output_query_names){
1988 string hfta_name =split_queries[hfta_nbr]->query_name;
1989 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
1990 for(l=0;l<hfta_nbr;++l){
1991 string lfta_name =split_queries[l]->query_name;
1992 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
1996 // fprintf(stderr,"query names are ");
1997 // for(l=0;l<hfta_nbr;++l){
1998 // if(l>0) fprintf(stderr,",");
1999 // string fta_name =split_queries[l]->query_name;
2000 // fprintf(stderr," %s",fta_name.c_str());
2002 // fprintf(stderr,"\n");
2007 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2008 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2015 //-----------------------------------------------------------------
2016 // Compute and propagate the SE in PROTOCOL fields compute a field.
2017 //-----------------------------------------------------------------
2019 for(i=0;i<lfta_list.size();i++){
2020 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2021 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2023 for(i=0;i<hfta_list.size();i++){
2024 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2025 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2030 //------------------------------------------------------------------------
2031 // Perform individual FTA optimizations
2032 //-----------------------------------------------------------------------
2034 if (partitioned_mode) {
2036 // open partition definition file
2037 string part_fname = config_dir_path + "partition.txt";
2039 FILE* partfd = fopen(part_fname.c_str(), "r");
2041 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2044 PartnParser_setfileinput(partfd);
2045 if (PartnParserparse()) {
2046 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2053 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2055 int num_hfta = hfta_list.size();
2056 for(i=0; i < hfta_list.size(); ++i){
2057 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2060 // Add all new hftas to schema
2061 for(i=num_hfta; i < hfta_list.size(); ++i){
2062 table_def *td = hfta_list[i]->get_output_tabledef();
2063 Schema->append_table(td);
2066 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2070 //------------------------------------------------------------------------
2071 // Do global (cross-fta) optimization
2072 //-----------------------------------------------------------------------
2079 set<string> extra_external_libs;
2081 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2084 // build hfta file name, create output
2085 if(numeric_hfta_flname){
2086 sprintf(tmpstr,"hfta_%d",hfta_count);
2087 hfta_names.push_back(tmpstr);
2088 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2090 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2091 hfta_names.push_back(tmpstr);
2092 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2094 FILE *hfta_fl = fopen(tmpstr,"w");
2095 if(hfta_fl == NULL){
2096 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2099 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2101 // If there is a field verifier, warn about
2102 // lack of compatability
2103 // NOTE : this code assumes that visible non-lfta queries
2104 // are those at the root of a stream query.
2105 string hfta_comment;
2107 string hfta_namespace;
2108 if(hfta_list[i]->defines.count("comment")>0)
2109 hfta_comment = hfta_list[i]->defines["comment"];
2110 if(hfta_list[i]->defines.count("Comment")>0)
2111 hfta_comment = hfta_list[i]->defines["Comment"];
2112 if(hfta_list[i]->defines.count("COMMENT")>0)
2113 hfta_comment = hfta_list[i]->defines["COMMENT"];
2114 if(hfta_list[i]->defines.count("title")>0)
2115 hfta_title = hfta_list[i]->defines["title"];
2116 if(hfta_list[i]->defines.count("Title")>0)
2117 hfta_title = hfta_list[i]->defines["Title"];
2118 if(hfta_list[i]->defines.count("TITLE")>0)
2119 hfta_title = hfta_list[i]->defines["TITLE"];
2120 if(hfta_list[i]->defines.count("namespace")>0)
2121 hfta_namespace = hfta_list[i]->defines["namespace"];
2122 if(hfta_list[i]->defines.count("Namespace")>0)
2123 hfta_namespace = hfta_list[i]->defines["Namespace"];
2124 if(hfta_list[i]->defines.count("Namespace")>0)
2125 hfta_namespace = hfta_list[i]->defines["Namespace"];
2127 if(field_verifier != NULL){
2129 if(hfta_comment == "")
2130 warning_str += "\tcomment not found.\n";
2131 if(hfta_title == "")
2132 warning_str += "\ttitle not found.\n";
2133 if(hfta_namespace == "")
2134 warning_str += "\tnamespace not found.\n";
2136 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2138 for(fi=0;fi<flds.size();fi++){
2139 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2141 if(warning_str != "")
2142 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2143 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2146 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2147 if(hfta_comment != "")
2148 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2149 if(hfta_title != "")
2150 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2151 if(hfta_namespace != "")
2152 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2153 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2154 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2156 // write info about fields to qtree.xml
2157 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2159 for(fi=0;fi<flds.size();fi++){
2160 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2161 if(flds[fi]->get_modifier_list()->size()){
2162 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2164 fprintf(qtree_output," />\n");
2167 // extract liveness timeout from query definition
2168 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2169 if (!liveness_timeout) {
2170 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2171 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2172 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2174 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2176 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2178 for(itv=0;itv<tmp_tv.size();++itv){
2179 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2181 string ifrs = hfta_list[i]->collect_refd_ifaces();
2183 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2185 fprintf(qtree_output,"\t</HFTA>\n");
2189 // debug only -- do code generation to catch generation-time errors.
2190 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2193 hfta_count++; // for hfta file names with numeric suffixes
2195 hfta_list[i]->get_external_libs(extra_external_libs);
2199 string ext_lib_string;
2200 set<string>::iterator ssi_el;
2201 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2202 ext_lib_string += (*ssi_el)+" ";
2206 // Report on the set of operator views
2207 for(i=0;i<opviews.size();++i){
2208 opview_entry *opve = opviews.get_entry(i);
2209 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2210 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2211 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2212 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2213 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2215 if (!opve->liveness_timeout) {
2216 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2217 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2218 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2220 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2222 for(j=0;j<opve->subq_names.size();j++)
2223 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2224 fprintf(qtree_output,"\t</UDOP>\n");
2228 //-----------------------------------------------------------------
2230 // Create interface-specific meta code files.
2231 // first, open and parse the interface resources file.
2232 ifaces_db = new ifq_t();
2234 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2235 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2236 ifx_fname.c_str(), ierr.c_str());
2240 map<string, vector<stream_query *> >::iterator svsi;
2241 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2242 string lmach = (*svsi).first;
2244 // For this machine, create a set of lftas per interface.
2245 vector<stream_query *> mach_lftas = (*svsi).second;
2246 map<string, vector<stream_query *> > lfta_iface_lists;
2248 for(li=0;li<mach_lftas.size();++li){
2249 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2250 string lfta_iface = tvec[0]->get_interface();
2251 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2254 map<string, vector<stream_query *> >::iterator lsvsi;
2255 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2257 string liface = (*lsvsi).first;
2258 vector<stream_query *> iface_lftas = (*lsvsi).second;
2259 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2260 if(iface_codegen_type.size()){
2261 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2263 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2266 string mcs = generate_nic_code(iface_lftas, nicprop);
2269 mcf_flnm = lmach + "_"+liface+".mcf";
2271 mcf_flnm = hostname + "_"+liface+".mcf";
2273 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2274 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2277 fprintf(mcf_fl,"%s",mcs.c_str());
2279 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2280 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2289 //-----------------------------------------------------------------
2292 // Find common filter predicates in the LFTAs.
2293 // in addition generate structs to store the temporal attributes unpacked by prefilter
2295 map<string, vector<stream_query *> >::iterator ssqi;
2296 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2298 string lmach = (*ssqi).first;
2299 bool packed_return = false;
2303 // The LFTAs of this machine.
2304 vector<stream_query *> mach_lftas = (*ssqi).second;
2305 // break up on a per-interface basis.
2306 map<string, vector<stream_query *> > lfta_iface_lists;
2307 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2309 for(li=0;li<mach_lftas.size();++li){
2310 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2311 string lfta_iface = tvec[0]->get_interface();
2312 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2313 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2317 // Are the return values "packed"?
2318 // This should be done on a per-interface basis.
2319 // But this is defunct code for gs-lite
2320 for(li=0;li<mach_lftas.size();++li){
2321 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2322 string liface = tvec[0]->get_interface();
2323 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2324 if(iface_codegen_type.size()){
2325 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2326 packed_return = true;
2332 // Separate lftas by interface, collect results on a per-interface basis.
2334 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2335 map<string, vector<cnf_set *> > prefilter_preds;
2336 set<unsigned int> pred_ids; // this can be global for all interfaces
2337 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2338 string liface = (*mvsi).first;
2339 vector<cnf_set *> empty_list;
2340 prefilter_preds[liface] = empty_list;
2341 if(! packed_return){
2342 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2345 // get NIC capabilities. (Is this needed?)
2346 nic_property *nicprop = NULL;
2347 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2348 if(iface_codegen_type.size()){
2349 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2351 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2358 // Now that we know the prefilter preds, generate the lfta code.
2359 // Do this for all lftas in this machine.
2360 for(li=0;li<mach_lftas.size();++li){
2361 set<unsigned int> subsumed_preds;
2362 set<unsigned int>::iterator sii;
2364 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2366 if((pid>>16) == li){
2367 subsumed_preds.insert(pid & 0xffff);
2371 string lfta_schema_str = mach_lftas[li]->make_schema();
2372 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2373 nic_property *nicprop = NULL; // no NIC properties?
2374 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2378 // generate structs to store the temporal attributes
2379 // unpacked by prefilter
2380 col_id_set temp_cids;
2381 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2382 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2384 // Compute the lfta bit signatures and the lfta colrefs
2385 // do this on a per-interface basis
2387 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2389 map<string, vector<long long int> > lfta_sigs; // used again later
2390 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2391 string liface = (*mvsi).first;
2392 vector<long long int> empty_list;
2393 lfta_sigs[liface] = empty_list;
2395 vector<col_id_set> lfta_cols;
2396 vector<int> lfta_snap_length;
2397 for(li=0;li<lfta_iface_lists[liface].size();++li){
2398 unsigned long long int mask=0, bpos=1;
2400 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2401 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2405 lfta_sigs[liface].push_back(mask);
2406 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2407 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2410 //for(li=0;li<mach_lftas.size();++li){
2411 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2412 //col_id_set::iterator tcisi;
2413 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2414 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2419 // generate the prefilter
2420 // Do this on a per-interface basis, except for the #define
2422 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2423 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2425 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2430 // Generate interface parameter lookup function
2431 lfta_val[lmach] += "// lookup interface properties by name\n";
2432 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2433 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2434 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2436 // collect a lit of interface names used by queries running on this host
2437 set<std::string> iface_names;
2438 for(i=0;i<mach_query_names[lmach].size();i++){
2439 int mi = mach_query_names[lmach][i];
2440 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2442 if(interface_names[mi]=="")
2443 iface_names.insert("DEFAULTDEV");
2445 iface_names.insert(interface_names[mi]);
2448 // generate interface property lookup code for every interface
2449 set<std::string>::iterator sir;
2450 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2451 if (sir == iface_names.begin())
2452 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2454 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2456 // iterate through interface properties
2457 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2459 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2462 if (iface_properties.empty())
2463 lfta_val[lmach] += "\t\treturn NULL;\n";
2465 for (int i = 0; i < iface_properties.size(); ++i) {
2467 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2469 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2471 // combine all values for the interface property using comma separator
2472 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2473 for (int j = 0; j < vals.size(); ++j) {
2474 lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
2475 if (j != vals.size()-1)
2476 lfta_val[lmach] += ",";
2478 lfta_val[lmach] += "\";\n";
2480 lfta_val[lmach] += "\t\t} else\n";
2481 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2484 lfta_val[lmach] += "\t} else\n";
2485 lfta_val[lmach] += "\t\treturn NULL;\n";
2486 lfta_val[lmach] += "}\n\n";
2489 // Generate a full list of FTAs for clearinghouse reference
2490 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2491 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2493 for (i = 0; i < query_names.size(); ++i) {
2495 lfta_val[lmach] += ", ";
2496 lfta_val[lmach] += "\"" + query_names[i] + "\"";
2498 for (i = 0; i < hfta_list.size(); ++i) {
2499 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2501 lfta_val[lmach] += ", NULL};\n\n";
2504 // Add the initialization function to lfta.c
2505 // Change to accept the interface name, and
2506 // set the prefilter function accordingly.
2507 // see the example in demo/err2
2508 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2510 // for(i=0;i<mach_query_names[lmach].size();i++)
2511 // int mi = mach_query_names[lmach][i];
2512 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2514 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2515 string liface = (*mvsi).first;
2516 vector<stream_query *> lfta_list = (*mvsi).second;
2517 for(i=0;i<lfta_list.size();i++){
2518 stream_query *lfta_sq = lfta_list[i];
2519 int mi = lfta_iface_qname_ix[liface][i];
2521 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2523 lfta_val[lmach] += "\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2524 if(interface_names[mi]=="")
2525 lfta_val[lmach]+="DEFAULTDEV";
2527 lfta_val[lmach]+='"'+interface_names[mi]+'"';
2529 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2530 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2532 sprintf(tmpstr,",%d",snap_lengths[mi]);
2533 lfta_val[lmach] += tmpstr;
2535 // unsigned long long int mask=0, bpos=1;
2537 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2538 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2540 // bpos = bpos << 1;
2544 // sprintf(tmpstr,",%lluull",mask);
2545 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2546 lfta_val[lmach]+=tmpstr;
2548 lfta_val[lmach] += ",0ull";
2551 lfta_val[lmach] += ");\n";
2555 // End of lfta prefilter stuff
2556 // --------------------------------------------------
2558 // If there is a field verifier, warn about
2559 // lack of compatability
2560 string lfta_comment;
2562 string lfta_namespace;
2563 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2564 if(ldefs.count("comment")>0)
2565 lfta_comment = lfta_sq->defines["comment"];
2566 if(ldefs.count("Comment")>0)
2567 lfta_comment = lfta_sq->defines["Comment"];
2568 if(ldefs.count("COMMENT")>0)
2569 lfta_comment = lfta_sq->defines["COMMENT"];
2570 if(ldefs.count("title")>0)
2571 lfta_title = lfta_sq->defines["title"];
2572 if(ldefs.count("Title")>0)
2573 lfta_title = lfta_sq->defines["Title"];
2574 if(ldefs.count("TITLE")>0)
2575 lfta_title = lfta_sq->defines["TITLE"];
2576 if(ldefs.count("NAMESPACE")>0)
2577 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2578 if(ldefs.count("Namespace")>0)
2579 lfta_namespace = lfta_sq->defines["Namespace"];
2580 if(ldefs.count("namespace")>0)
2581 lfta_namespace = lfta_sq->defines["namespace"];
2583 string lfta_ht_size;
2584 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2585 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2586 if(ldefs.count("aggregate_slots")>0){
2587 lfta_ht_size = ldefs["aggregate_slots"];
2590 // NOTE : I'm assuming that visible lftas do not start with _fta.
2591 // -- will fail for non-visible simple selection queries.
2592 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2594 if(lfta_comment == "")
2595 warning_str += "\tcomment not found.\n";
2596 if(lfta_title == "")
2597 warning_str += "\ttitle not found.\n";
2598 if(lfta_namespace == "")
2599 warning_str += "\tnamespace not found.\n";
2601 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2603 for(fi=0;fi<flds.size();fi++){
2604 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2606 if(warning_str != "")
2607 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2608 query_names[mi].c_str(),warning_str.c_str());
2612 // Create qtree output
2613 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2614 if(lfta_comment != "")
2615 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2616 if(lfta_title != "")
2617 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2618 if(lfta_namespace != "")
2619 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2620 if(lfta_ht_size != "")
2621 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2623 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2625 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2626 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2627 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2628 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2629 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2630 // write info about fields to qtree.xml
2631 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2633 for(fi=0;fi<flds.size();fi++){
2634 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2635 if(flds[fi]->get_modifier_list()->size()){
2636 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2638 fprintf(qtree_output," />\n");
2640 fprintf(qtree_output,"\t</LFTA>\n");
2646 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2647 string liface = (*mvsi).first;
2649 " if (!strcmp(device, \""+liface+"\")) \n"
2650 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2654 " if(lfta_prefilter == NULL){\n"
2655 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2662 lfta_val[lmach] += "}\n\n";
2664 if(!(debug_only || hfta_only) ){
2667 lfta_flnm = lmach + "_lfta.c";
2669 lfta_flnm = hostname + "_lfta.c";
2670 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2671 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2674 fprintf(lfta_out,"%s",lfta_header.c_str());
2675 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2676 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2681 // Say what are the operators which must execute
2682 if(opviews.size()>0)
2683 fprintf(stderr,"The queries use the following external operators:\n");
2684 for(i=0;i<opviews.size();++i){
2685 opview_entry *opv = opviews.get_entry(i);
2686 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2690 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2691 machine_names, schema_file_name,
2693 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2696 fprintf(qtree_output,"</QueryNodes>\n");
2701 ////////////////////////////////////////////////////////////
2703 void generate_makefile(vector<string> &input_file_names, int nfiles,
2704 vector<string> &hfta_names, opview_set &opviews,
2705 vector<string> &machine_names,
2706 string schema_file_name,
2707 vector<string> &interface_names,
2708 ifq_t *ifdb, string &config_dir_path,
2711 map<string, vector<int> > &rts_hload
2715 if(config_dir_path != ""){
2716 config_dir_path = "-C "+config_dir_path;
2720 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2721 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2723 // if(libz_exists && !libast_exists){
2724 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2728 // Get set of operator executable files to run
2730 set<string>::iterator ssi;
2731 for(i=0;i<opviews.size();++i){
2732 opview_entry *opv = opviews.get_entry(i);
2733 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2736 FILE *outfl = fopen("Makefile", "w");
2738 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2743 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
2744 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2748 fprintf(outfl," -DLFTA_STATS");
2750 // Gather the set of interfaces
2751 // Also, gather "base interface names" for use in computing
2752 // the hash splitting to virtual interfaces.
2753 // TODO : must update to hanndle machines
2755 set<string> base_vifaces; // base interfaces of virtual interfaces
2756 map<string, string> ifmachines;
2757 map<string, string> ifattrs;
2758 for(i=0;i<interface_names.size();++i){
2759 ifaces.insert(interface_names[i]);
2760 ifmachines[interface_names[i]] = machine_names[i];
2762 size_t Xpos = interface_names[i].find_last_of("X");
2763 if(Xpos!=string::npos){
2764 string iface = interface_names[i].substr(0,Xpos);
2765 base_vifaces.insert(iface);
2767 // get interface attributes and add them to the list
2770 // Do we need to include protobuf libraries?
2771 bool use_proto = false;
2774 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2775 string ifnm = (*ssi);
2776 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
2777 for(int ift_i=0;ift_i<ift.size();ift_i++){
2778 if(ift[ift_i]=="PROTO"){
2788 for(i=0;i<hfta_names.size();++i)
2789 fprintf(outfl," %s",hfta_names[i].c_str());
2793 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
2794 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2796 fprintf(outfl,"-L. ");
2798 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2800 fprintf(outfl,"-lgscppads -lpads ");
2802 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2804 fprintf(outfl, " -lpz -lz -lbz ");
2805 if(libz_exists && libast_exists)
2806 fprintf(outfl," -last ");
2808 fprintf(outfl, " -ldll -ldl ");
2810 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
2811 fprintf(outfl," -lgscpaux");
2813 fprintf(outfl," -fprofile-arcs");
2818 "lfta.o: %s_lfta.c\n"
2819 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
2821 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
2822 for(i=0;i<nfiles;++i)
2823 fprintf(outfl," %s",input_file_names[i].c_str());
2825 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2827 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2829 for(i=0;i<nfiles;++i)
2830 fprintf(outfl," %s",input_file_names[i].c_str());
2831 fprintf(outfl,"\n");
2833 for(i=0;i<hfta_names.size();++i)
2836 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
2839 "\t$(CPP) -o %s.o -c %s.cc\n"
2842 hfta_names[i].c_str(), hfta_names[i].c_str(),
2843 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
2844 hfta_names[i].c_str(), hfta_names[i].c_str(),
2845 hfta_names[i].c_str(), hfta_names[i].c_str()
2850 "packet_schema.txt:\n"
2851 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
2853 "external_fcns.def:\n"
2854 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
2857 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
2858 for(i=0;i<hfta_names.size();++i)
2859 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
2860 fprintf(outfl,"\n");
2866 // Gather the set of interfaces
2867 // TODO : must update to hanndle machines
2868 // TODO : lookup interface attributes and add them as a parameter to rts process
2869 outfl = fopen("runit", "w");
2871 fprintf(stderr,"Can't open runit for write, exiting.\n");
2879 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
2881 "if [ ! -f gshub.log ]\n"
2883 "\techo \"Failed to start bin/gshub.py\"\n"
2886 "ADDR=`cat gshub.log`\n"
2887 "ps opgid= $! >> gs.pids\n"
2888 "./rts $ADDR default ").c_str(), outfl);
2891 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2892 string ifnm = (*ssi);
2893 fprintf(outfl, "%s ",ifnm.c_str());
2894 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
2895 for(j=0;j<ifv.size();++j)
2896 fprintf(outfl, "%s ",ifv[j].c_str());
2898 fprintf(outfl, " &\n");
2899 fprintf(outfl, "echo $! >> gs.pids\n");
2900 for(i=0;i<hfta_names.size();++i)
2901 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
2903 for(j=0;j<opviews.opview_list.size();++j){
2904 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
2908 system("chmod +x runit");
2910 outfl = fopen("stopit", "w");
2912 fprintf(stderr,"Can't open stopit for write, exiting.\n");
2916 fprintf(outfl,"#!/bin/sh\n"
2918 "if [ ! -f gs.pids ]\n"
2922 "for pgid in `cat gs.pids`\n"
2924 "kill -TERM -$pgid\n"
2927 "for pgid in `cat gs.pids`\n"
2934 system("chmod +x stopit");
2936 //-----------------------------------------------
2938 /* For now disable support for virtual interfaces
2939 outfl = fopen("set_vinterface_hash.bat", "w");
2941 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
2945 // The format should be determined by an entry in the ifres.xml file,
2946 // but for now hardcode the only example I have.
2947 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
2948 if(rts_hload.count((*ssi))){
2949 string iface_name = (*ssi);
2950 string iface_number = "";
2951 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
2952 if(isdigit(iface_name[j])){
2953 iface_number = iface_name[j];
2954 if(j>0 && isdigit(iface_name[j-1]))
2955 iface_number = iface_name[j-1] + iface_number;
2959 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
2960 vector<int> halloc = rts_hload[iface_name];
2962 for(j=0;j<halloc.size();++j){
2965 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
2966 prev_limit = halloc[j];
2968 fprintf(outfl,"\n");
2972 system("chmod +x set_vinterface_hash.bat");
2976 // Code for implementing a local schema
2978 table_list qpSchema;
2980 // Load the schemas of any LFTAs.
2982 for(l=0;l<hfta_nbr;++l){
2983 stream_query *sq0 = split_queries[l];
2984 table_def *td = sq0->get_output_tabledef();
2985 qpSchema.append_table(td);
2987 // load the schemas of any other ref'd tables.
2989 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
2991 for(ti=0;ti<input_tbl_names.size();++ti){
2992 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
2994 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
2996 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
2999 qpSchema.append_table(Schema->get_table(tbl_ref));
3004 // Functions related to parsing.
3007 static int split_string(char *instr,char sep, char **words,int max_words){
3013 words[nwords++] = str;
3014 while( (loc = strchr(str,sep)) != NULL){
3017 if(nwords >= max_words){
3018 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3019 nwords = max_words-1;
3021 words[nwords++] = str;