1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
69 // Interface to the field list verifier
70 field_list *field_verifier = NULL;
72 #define TMPSTRLEN 1000
75 #define PATH_DELIM '/'
78 char tmp_schema_str[10000];
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
85 // Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
88 // Interface to FTA definition lexer and parser ...
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
99 // Interface to external function lexer and parser ...
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
105 ext_fcn_list *Ext_fcns;
108 // Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
118 // forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120 vector<string> &hfta_names, opview_set &opviews,
121 vector<string> &machine_names,
122 string schema_file_name,
123 vector<string> &interface_names,
124 ifq_t *ifdb, string &config_dir_path,
127 map<string, vector<int> > &rts_hload
130 //static int split_string(char *instr,char sep, char **words,int max_words);
133 FILE *schema_summary_output = NULL; // query names
135 // Dump schema summary
136 void dump_summary(stream_query *str){
137 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
139 table_def *sch = str->get_output_tabledef();
141 vector<field_entry *> flds = sch->get_fields();
143 for(f=0;f<flds.size();++f){
144 if(f>0) fprintf(schema_summary_output,"|");
145 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
147 fprintf(schema_summary_output,"\n");
148 for(f=0;f<flds.size();++f){
149 if(f>0) fprintf(schema_summary_output,"|");
150 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
152 fprintf(schema_summary_output,"\n");
156 string hostname; // name of current host.
158 bool generate_stats = false;
159 string root_path = "../..";
162 int main(int argc, char **argv){
163 char tmpstr[TMPSTRLEN];
167 set<int>::iterator si;
169 vector<string> registration_query_names; // for lfta.c registration
170 map<string, vector<int> > mach_query_names; // list queries of machine
171 vector<int> snap_lengths; // for lfta.c registration
172 vector<int> snap_position; // for lfta.c registration
173 vector<string> interface_names; // for lfta.c registration
174 vector<string> machine_names; // machine of interface
175 vector<bool> lfta_reuse_options; // for lfta.c registration
176 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
177 vector<string> hfta_names; // hfta cource code names, for
178 // creating make file.
179 vector<string> qnames; // ensure unique names
180 map<string, int> lfta_names; // keep track of unique lftas.
183 // set these to 1 to debug the parser
185 Ext_fcnsParserdebug = 0;
187 FILE *lfta_out; // lfta.c output.
188 FILE *fta_in; // input file
189 FILE *table_schemas_in; // source tables definition file
190 FILE *query_name_output; // query names
191 FILE *qtree_output; // interconnections of query nodes
193 // -------------------------------
194 // Handling of Input Arguments
195 // -------------------------------
196 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
197 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
198 "\t[-B] : debug only (don't create output files)\n"
199 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
200 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
201 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
202 "\t[-C] : use <config directory> for definition files\n"
203 "\t[-l] : use <library directory> for library queries\n"
204 "\t[-N] : output query names in query_names.txt\n"
205 "\t[-H] : create HFTA only (no schema_file)\n"
206 "\t[-Q] : use query name for hfta suffix\n"
207 "\t[-M] : generate make file and runit, stopit scripts\n"
208 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
209 "\t[-f] : Output schema summary to schema_summary.txt\n"
210 "\t[-P] : link with PADS\n"
211 "\t[-h] : override host name.\n"
212 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
213 "\t[-R] : path to root of GS-lite\n"
216 // parameters gathered from command line processing
217 string external_fcns_path;
218 // string internal_fcn_path;
219 string config_dir_path;
220 string library_path = "./";
221 vector<string> input_file_names;
222 string schema_file_name;
223 bool debug_only = false;
224 bool hfta_only = false;
225 bool output_query_names = false;
226 bool output_schema_summary=false;
227 bool numeric_hfta_flname = true;
228 bool create_makefile = false;
229 bool distributed_mode = false;
230 bool partitioned_mode = false;
231 bool use_live_hosts_file = false;
232 bool use_pads = false;
233 bool clean_make = false;
234 int n_virtual_interfaces = 1;
237 while((chopt = getopt(argc,argv,optstr)) != -1){
243 distributed_mode = true;
246 partitioned_mode = true;
249 use_live_hosts_file = true;
253 config_dir_path = string(optarg) + string("/");
257 library_path = string(optarg) + string("/");
260 output_query_names = true;
263 numeric_hfta_flname = false;
266 if(schema_file_name == ""){
271 output_schema_summary=true;
274 create_makefile=true;
295 n_virtual_interfaces = atoi(optarg);
296 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
297 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
298 n_virtual_interfaces = 1;
303 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
304 fprintf(stderr,"%s\n", usage_str);
307 fprintf(stderr, "Argument was %c\n", optopt);
308 fprintf(stderr,"Invalid arguments\n");
309 fprintf(stderr,"%s\n", usage_str);
315 for (int i = 0; i < argc; ++i) {
316 if((schema_file_name == "") && !hfta_only){
317 schema_file_name = argv[i];
319 input_file_names.push_back(argv[i]);
323 if(input_file_names.size() == 0){
324 fprintf(stderr,"%s\n", usage_str);
329 string clean_cmd = "rm Makefile hfta_*.cc";
330 int clean_ret = system(clean_cmd.c_str());
332 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
337 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
339 // Open globally used file names.
341 // prepend config directory to schema file
342 schema_file_name = config_dir_path + schema_file_name;
343 external_fcns_path = config_dir_path + string("external_fcns.def");
344 string ifx_fname = config_dir_path + string("ifres.xml");
346 // Find interface query file(s).
348 gethostname(tmpstr,TMPSTRLEN);
351 hostname_len = strlen(tmpstr);
352 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
353 vector<string> ifq_fls;
355 ifq_fls.push_back(ifq_fname);
358 // Get the field list, if it exists
359 string flist_fl = config_dir_path + "field_list.xml";
361 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
362 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
363 xml_leaves = new xml_t();
364 xmlParser_setfileinput(flf_in);
365 if(xmlParserparse()){
366 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
368 field_verifier = new field_list(xml_leaves);
373 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
374 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
380 if(!(debug_only || hfta_only)){
381 if((lfta_out = fopen("lfta.c","w")) == NULL){
382 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
388 // Get the output specification file.
390 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
391 string ospec_fl = "output_spec.cfg";
393 vector<ospec_str *> output_specs;
394 multimap<string, int> qname_to_ospec;
395 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
398 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
400 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
401 if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
403 // make operator type lowercase
405 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
406 *tmpc = tolower(*tmpc);
408 ospec_str *tmp_ospec = new ospec_str();
409 tmp_ospec->query = flds[0];
410 tmp_ospec->operator_type = flds[1];
411 tmp_ospec->operator_param = flds[2];
412 tmp_ospec->output_directory = flds[3];
413 tmp_ospec->bucketwidth = atoi(flds[4]);
414 tmp_ospec->partitioning_flds = flds[5];
415 tmp_ospec->n_partitions = atoi(flds[6]);
416 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
417 output_specs.push_back(tmp_ospec);
419 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
425 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
430 string pspec_fl = "hfta_parallelism.cfg";
432 map<string, int> hfta_parallelism;
433 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
436 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
437 bool good_entry = true;
439 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
441 string hname = flds[0];
442 int par = atoi(flds[1]);
443 if(par <= 0 || par > n_virtual_interfaces){
444 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 if(good_entry && n_virtual_interfaces % par != 0){
448 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
452 hfta_parallelism[hname] = par;
456 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
460 // LFTA hash table sizes
461 string htspec_fl = "lfta_htsize.cfg";
462 FILE *htsp_in = NULL;
463 map<string, int> lfta_htsize;
464 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
467 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
468 bool good_entry = true;
470 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
472 string lfta_name = flds[0];
473 int htsz = atoi(flds[1]);
475 lfta_htsize[lfta_name] = htsz;
477 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
482 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
485 // LFTA vitual interface hash split
486 string rtlspec_fl = "rts_load.cfg";
488 map<string, vector<int> > rts_hload;
489 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
494 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
495 bool good_entry = true;
499 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
501 iface_name = flds[0];
504 for(j=1;j<nflds;++j){
505 int h = atoi(flds[j]);
509 hload.push_back(cumm_h);
515 rts_hload[iface_name] = hload;
517 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
524 if(output_query_names){
525 if((query_name_output = fopen("query_names.txt","w")) == NULL){
526 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
531 if(output_schema_summary){
532 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
533 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
538 if((qtree_output = fopen("qtree.xml","w")) == NULL){
539 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
542 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
543 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
544 fprintf(qtree_output,"<QueryNodes>\n");
547 // Get an initial Schema
550 // Parse the table schema definitions.
551 fta_parse_result = new fta_parse_t();
552 FtaParser_setfileinput(table_schemas_in);
553 if(FtaParserparse()){
554 fprintf(stderr,"Table schema parse failed.\n");
557 if(fta_parse_result->parse_type != TABLE_PARSE){
558 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
561 Schema = fta_parse_result->tables;
563 // Ensure that all schema_ids, if set, are distinct.
564 // Obsolete? There is code elsewhere to ensure that schema IDs are
565 // distinct on a per-interface basis.
569 for(int t=0;t<Schema->size();++t){
570 int sch_id = Schema->get_table(t)->get_schema_id();
572 if(found_ids.find(sch_id) != found_ids.end()){
573 dup_ids.insert(sch_id);
575 found_ids.insert(sch_id);
578 if(dup_ids.size()>0){
579 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
580 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
581 fprintf(stderr," %d",(*dit));
582 fprintf(stderr,"\n");
589 // Process schema field inheritance
591 retval = Schema->unroll_tables(err_str);
593 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
597 // hfta only => we will try to fetch schemas from the registry.
598 // therefore, start off with an empty schema.
599 Schema = new table_list();
603 // Open and parse the external functions file.
604 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
605 if(Ext_fcnsParserin == NULL){
606 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
607 Ext_fcns = new ext_fcn_list();
609 if(Ext_fcnsParserparse()){
610 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
611 Ext_fcns = new ext_fcn_list();
614 if(Ext_fcns->validate_fcns(err_str)){
615 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
619 // Open and parse the interface resources file.
620 // ifq_t *ifaces_db = new ifq_t();
622 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
623 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
624 // ifx_fname.c_str(), ierr.c_str());
627 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
628 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
629 // ifq_fname.c_str(), ierr.c_str());
634 // The LFTA code string.
635 // Put the standard preamble here.
636 // NOTE: the hash macros, fcns should go into the run time
637 map<string, string> lfta_val;
638 map<string, string> lfta_prefilter_val;
641 "#include <limits.h>\n"
642 "#include \"rts.h\"\n"
643 "#include \"fta.h\"\n"
644 "#include \"lapp.h\"\n"
645 "#include \"rts_udaf.h\"\n"
646 "#include<stdio.h>\n"
647 "#include <float.h>\n"
648 "#include \"rdtsc.h\"\n"
649 "#include \"watchlist.h\"\n\n"
652 // Get any locally defined parsing headers
654 memset(&glob_result, 0, sizeof(glob_result));
656 // do the glob operation TODO should be from GSROOT
657 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
658 if(return_value == 0){
660 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
662 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
663 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
667 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
671 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
672 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
673 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
674 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
679 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
681 "#define SLOT_FILLED 0x04\n"
682 "#define SLOT_GEN_BITS 0x03\n"
683 "#define SLOT_HASH_BITS 0xfffffff8\n"
684 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
685 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
686 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
689 "#define lfta_BOOL_to_hash(x) (x)\n"
690 "#define lfta_USHORT_to_hash(x) (x)\n"
691 "#define lfta_UINT_to_hash(x) (x)\n"
692 "#define lfta_IP_to_hash(x) (x)\n"
693 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
694 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
695 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
696 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
697 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
698 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
699 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
700 " for(i=0;i<x.length;++i){\n"
701 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
707 " if((i%4)!=0) ret ^=tmp_sum;\n"
713 //////////////////////////////////////////////////////////////////
714 ///// Get all of the query parse trees
718 int hfta_count = 0; // for numeric suffixes to hfta .cc files
720 //---------------------------
721 // Global info needed for post processing.
723 // Set of operator views ref'd in the query set.
725 // lftas on a per-machine basis.
726 map<string, vector<stream_query *> > lfta_mach_lists;
727 int nfiles = input_file_names.size();
728 vector<stream_query *> hfta_list; // list of hftas.
729 map<string, stream_query *> sq_map; // map from query name to stream query.
732 //////////////////////////////////////////
734 // Open and parse the interface resources file.
735 ifq_t *ifaces_db = new ifq_t();
737 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
738 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
739 ifx_fname.c_str(), ierr.c_str());
742 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
743 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
744 ifq_fls[0].c_str(), ierr.c_str());
748 map<string, string> qname_to_flname; // for detecting duplicate query names
752 // Parse the files to create a vector of parse trees.
753 // Load qnodes with information to perform a topo sort
754 // based on query dependencies.
755 vector<query_node *> qnodes; // for topo sort.
756 map<string,int> name_node_map; // map query name to qnodes entry
757 for(i=0;i<input_file_names.size();i++){
759 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
760 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
763 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
765 // Parse the FTA query
766 fta_parse_result = new fta_parse_t();
767 FtaParser_setfileinput(fta_in);
768 if(FtaParserparse()){
769 fprintf(stderr,"FTA parse failed.\n");
772 if(fta_parse_result->parse_type != QUERY_PARSE){
773 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
777 // returns a list of parse trees
778 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
779 for(p=0;p<qlist.size();++p){
780 table_exp_t *fta_parse_tree = qlist[p];
781 // query_parse_trees.push_back(fta_parse_tree);
783 // compute the default name -- extract from query name
784 strcpy(tmpstr,input_file_names[i].c_str());
785 char *qname = strrchr(tmpstr,PATH_DELIM);
790 char *qname_end = strchr(qname,'.');
791 if(qname_end != NULL) *qname_end = '\0';
792 string qname_str = qname;
793 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
795 // Deternmine visibility. Should I be attaching all of the output methods?
796 if(qname_to_ospec.count(imputed_qname)>0)
797 fta_parse_tree->set_visible(true);
799 fta_parse_tree->set_visible(false);
802 // Create a manipulable repesentation of the parse tree.
803 // the qnode inherits the visibility assigned to the parse tree.
804 int pos = qnodes.size();
805 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
806 name_node_map[ qnodes[pos]->name ] = pos;
807 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
808 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
809 // qfiles.push_back(i);
811 // Check for duplicate query names
812 // NOTE : in hfta-only generation, I should
813 // also check with the names of the registered queries.
814 if(qname_to_flname.count(qnodes[pos]->name) > 0){
815 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
816 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
819 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
820 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
821 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
824 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
830 // Add the library queries
833 for(pos=0;pos<qnodes.size();++pos){
835 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
836 string src_tbl = qnodes[pos]->refd_tbls[fi];
837 if(qname_to_flname.count(src_tbl) == 0){
838 int last_sep = src_tbl.find_last_of('/');
839 if(last_sep != string::npos){
840 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
841 string target_qname = src_tbl.substr(last_sep+1);
842 string qpathname = library_path + src_tbl + ".gsql";
843 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
844 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
846 fprintf(stderr,"After exit\n");
848 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
849 // Parse the FTA query
850 fta_parse_result = new fta_parse_t();
851 FtaParser_setfileinput(fta_in);
852 if(FtaParserparse()){
853 fprintf(stderr,"FTA parse failed.\n");
856 if(fta_parse_result->parse_type != QUERY_PARSE){
857 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
861 map<string, int> local_query_map;
862 vector<string> local_query_names;
863 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
864 for(p=0;p<qlist.size();++p){
865 table_exp_t *fta_parse_tree = qlist[p];
866 fta_parse_tree->set_visible(false); // assumed to not produce output
867 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
868 if(imputed_qname == target_qname)
869 imputed_qname = src_tbl;
870 if(local_query_map.count(imputed_qname)>0){
871 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
874 local_query_map[ imputed_qname ] = p;
875 local_query_names.push_back(imputed_qname);
878 if(local_query_map.count(src_tbl)==0){
879 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
883 vector<int> worklist;
884 set<int> added_queries;
885 vector<query_node *> new_qnodes;
886 worklist.push_back(local_query_map[target_qname]);
887 added_queries.insert(local_query_map[target_qname]);
889 int qpos = qnodes.size();
890 for(qq=0;qq<worklist.size();++qq){
891 int q_id = worklist[qq];
892 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
893 new_qnodes.push_back( new_qnode);
894 vector<string> refd_tbls = new_qnode->refd_tbls;
896 for(ff = 0;ff<refd_tbls.size();++ff){
897 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
899 if(name_node_map.count(refd_tbls[ff])>0){
900 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
903 worklist.push_back(local_query_map[refd_tbls[ff]]);
909 for(qq=0;qq<new_qnodes.size();++qq){
910 int qpos = qnodes.size();
911 qnodes.push_back(new_qnodes[qq]);
912 name_node_map[qnodes[qpos]->name ] = qpos;
913 qname_to_flname[qnodes[qpos]->name ] = qpathname;
927 //---------------------------------------
932 string udop_missing_sources;
933 for(i=0;i<qnodes.size();++i){
935 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
936 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
938 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
939 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
940 int pos = qnodes.size();
941 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
942 name_node_map[ qnodes[pos]->name ] = pos;
943 qnodes[pos]->is_externally_visible = false; // its visible
944 // Need to mark the source queries as visible.
946 string missing_sources = "";
947 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
948 string src_tbl = qnodes[pos]->refd_tbls[si];
949 if(name_node_map.count(src_tbl)==0){
950 missing_sources += src_tbl + " ";
953 if(missing_sources != ""){
954 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
961 if(udop_missing_sources != ""){
962 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
968 ////////////////////////////////////////////////////////////////////
969 /// Check parse trees to verify that some
970 /// global properties are met :
971 /// if q1 reads from q2, then
972 /// q2 is processed before q1
973 /// q1 can supply q2's parameters
974 /// Verify there is no cycle in the reads-from graph.
976 // Compute an order in which to process the
979 // Start by building the reads-from lists.
982 for(i=0;i<qnodes.size();++i){
984 vector<string> refd_tbls = qnodes[i]->refd_tbls;
985 for(fi = 0;fi<refd_tbls.size();++fi){
986 if(name_node_map.count(refd_tbls[fi])>0){
987 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
988 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
994 // If one query reads the result of another,
995 // check for parameter compatibility. Currently it must
996 // be an exact match. I will move to requiring
997 // containment after re-ordering, but will require
998 // some analysis for code generation which is not
1000 //printf("There are %d query nodes.\n",qnodes.size());
1003 for(i=0;i<qnodes.size();++i){
1004 vector<var_pair_t *> target_params = qnodes[i]->params;
1005 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1006 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1007 if(target_params.size() != source_params.size()){
1008 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1012 for(p=0;p<target_params.size();++p){
1013 if(! (target_params[p]->name == source_params[p]->name &&
1014 target_params[p]->val == source_params[p]->val ) ){
1015 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1024 // Start by counting inedges.
1025 for(i=0;i<qnodes.size();++i){
1026 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1027 qnodes[(*si)]->n_consumers++;
1031 // The roots are the nodes with indegree zero.
1033 for(i=0;i<qnodes.size();++i){
1034 if(qnodes[i]->n_consumers == 0){
1035 if(qnodes[i]->is_externally_visible == false){
1036 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1042 // Remove the parts of the subtree that produce no output.
1043 set<int> valid_roots;
1044 set<int> discarded_nodes;
1045 set<int> candidates;
1046 while(roots.size() >0){
1047 for(si=roots.begin();si!=roots.end();++si){
1048 if(qnodes[(*si)]->is_externally_visible){
1049 valid_roots.insert((*si));
1051 discarded_nodes.insert((*si));
1052 set<int>::iterator sir;
1053 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1054 qnodes[(*sir)]->n_consumers--;
1055 if(qnodes[(*sir)]->n_consumers == 0)
1056 candidates.insert( (*sir));
1063 roots = valid_roots;
1064 if(discarded_nodes.size()>0){
1065 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1067 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1068 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1070 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1072 fprintf(stderr,"\n");
1075 // Compute the sources_to set, ignoring discarded nodes.
1076 for(i=0;i<qnodes.size();++i){
1077 if(discarded_nodes.count(i)==0)
1078 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1079 qnodes[(*si)]->sources_to.insert(i);
1084 // Find the nodes that are shared by multiple visible subtrees.
1085 // THe roots become inferred visible nodes.
1087 // Find the visible nodes.
1088 vector<int> visible_nodes;
1089 for(i=0;i<qnodes.size();i++){
1090 if(qnodes[i]->is_externally_visible){
1091 visible_nodes.push_back(i);
1095 // Find UDOPs referenced by visible nodes.
1097 for(i=0;i<visible_nodes.size();++i){
1098 workq.push_back(visible_nodes[i]);
1100 while(!workq.empty()){
1101 int node = workq.front();
1103 set<int>::iterator children;
1104 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1105 qnodes[node]->is_externally_visible = true;
1106 visible_nodes.push_back(node);
1107 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1108 if(qnodes[(*children)]->is_externally_visible == false){
1109 qnodes[(*children)]->is_externally_visible = true;
1110 visible_nodes.push_back((*children));
1114 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1115 workq.push_back((*children));
1122 for(i=0;i<qnodes.size();i++){
1123 qnodes[i]->subtree_roots.clear();
1126 // Walk the tree defined by a visible node, not descending into
1127 // subtrees rooted by a visible node. Mark the node visited with
1128 // the visible node ID.
1129 for(i=0;i<visible_nodes.size();++i){
1131 vroots.insert(visible_nodes[i]);
1132 while(vroots.size()>0){
1133 for(si=vroots.begin();si!=vroots.end();++si){
1134 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1136 set<int>::iterator sir;
1137 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1138 if(! qnodes[(*sir)]->is_externally_visible){
1139 candidates.insert( (*sir));
1143 vroots = candidates;
1147 // Find the nodes in multiple visible node subtrees, but with no parent
1148 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1149 done = true; // until proven otherwise
1150 for(i=0;i<qnodes.size();i++){
1151 if(qnodes[i]->subtree_roots.size()>1){
1152 bool is_new_root = true;
1153 set<int>::iterator sir;
1154 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1155 if(qnodes[(*sir)]->subtree_roots.size()>1)
1156 is_new_root = false;
1159 qnodes[i]->is_externally_visible = true;
1160 qnodes[i]->inferred_visible_node = true;
1161 visible_nodes.push_back(i);
1172 // get visible nodes in topo ordering.
1173 // for(i=0;i<qnodes.size();i++){
1174 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1176 vector<int> process_order;
1177 while(roots.size() >0){
1178 for(si=roots.begin();si!=roots.end();++si){
1179 if(discarded_nodes.count((*si))==0){
1180 process_order.push_back( (*si) );
1182 set<int>::iterator sir;
1183 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1184 qnodes[(*sir)]->n_consumers--;
1185 if(qnodes[(*sir)]->n_consumers == 0)
1186 candidates.insert( (*sir));
1194 //printf("process_order.size() =%d\n",process_order.size());
1196 // Search for cyclic dependencies
1198 for(i=0;i<qnodes.size();++i){
1199 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1200 if(found_dep.size() != 0) found_dep += ", ";
1201 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1204 if(found_dep.size()>0){
1205 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1209 // Get a list of query sets, in the order to be processed.
1210 // Start at visible root and do bfs.
1211 // The query set includes queries referenced indirectly,
1212 // as sources for user-defined operators. These are needed
1213 // to ensure that they are added to the schema, but are not part
1214 // of the query tree.
1216 // stream_node_sets contains queries reachable only through the
1217 // FROM clause, so I can tell which queries to add to the stream
1218 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1220 // NOTE: this code works because in order for data to be
1221 // read by multiple hftas, the node must be externally visible.
1222 // But visible nodes define roots of process sets.
1223 // internally visible nodes can feed data only
1224 // to other nodes in the same query file.
1225 // Therefore, any access can be restricted to a file,
1226 // hfta output sharing is done only on roots
1227 // never on interior nodes.
1232 // Conpute the base collection of hftas.
1233 vector<hfta_node *> hfta_sets;
1234 map<string, int> hfta_name_map;
1235 // vector< vector<int> > process_sets;
1236 // vector< set<int> > stream_node_sets;
1237 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1238 // i.e. process leaves 1st.
1239 for(i=0;i<process_order.size();++i){
1240 if(qnodes[process_order[i]]->is_externally_visible == true){
1241 //printf("Visible.\n");
1242 int root = process_order[i];
1243 hfta_node *hnode = new hfta_node();
1244 hnode->name = qnodes[root]-> name;
1245 hnode->source_name = qnodes[root]-> name;
1246 hnode->is_udop = qnodes[root]->is_udop;
1247 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1249 vector<int> proc_list; proc_list.push_back(root);
1250 // Ensure that nodes are added only once.
1251 set<int> proc_set; proc_set.insert(root);
1252 roots.clear(); roots.insert(root);
1254 while(roots.size()>0){
1255 for(si=roots.begin();si!=roots.end();++si){
1256 //printf("Processing root %d\n",(*si));
1257 set<int>::iterator sir;
1258 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1259 //printf("reads fom %d\n",(*sir));
1260 if(qnodes[(*sir)]->is_externally_visible==false){
1261 candidates.insert( (*sir) );
1262 if(proc_set.count( (*sir) )==0){
1263 proc_set.insert( (*sir) );
1264 proc_list.push_back( (*sir) );
1273 reverse(proc_list.begin(), proc_list.end());
1274 hnode->query_node_indices = proc_list;
1275 hfta_name_map[hnode->name] = hfta_sets.size();
1276 hfta_sets.push_back(hnode);
1280 // Compute the reads_from / sources_to graphs for the hftas.
1282 for(i=0;i<hfta_sets.size();++i){
1283 hfta_node *hnode = hfta_sets[i];
1284 for(q=0;q<hnode->query_node_indices.size();q++){
1285 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1286 for(s=0;s<qnode->refd_tbls.size();++s){
1287 if(hfta_name_map.count(qnode->refd_tbls[s])){
1288 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1289 hnode->reads_from.insert(other_hfta);
1290 hfta_sets[other_hfta]->sources_to.insert(i);
1296 // Compute a topological sort of the hfta_sets.
1298 vector<int> hfta_topsort;
1300 int hnode_srcs[hfta_sets.size()];
1301 for(i=0;i<hfta_sets.size();++i){
1303 if(hfta_sets[i]->sources_to.size() == 0)
1307 while(! workq.empty()){
1308 int node = workq.front();
1310 hfta_topsort.push_back(node);
1311 set<int>::iterator stsi;
1312 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1313 int parent = (*stsi);
1314 hnode_srcs[parent]++;
1315 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1316 workq.push_back(parent);
1321 // Decorate hfta nodes with the level of parallelism given as input.
1323 map<string, int>::iterator msii;
1324 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1325 string hfta_name = (*msii).first;
1326 int par = (*msii).second;
1327 if(hfta_name_map.count(hfta_name) > 0){
1328 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1330 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1334 // Propagate levels of parallelism: children should have a level of parallelism
1335 // as large as any of its parents. Adjust children upwards to compensate.
1336 // Start at parents and adjust children, auto-propagation will occur.
1338 for(i=hfta_sets.size()-1;i>=0;i--){
1339 set<int>::iterator stsi;
1340 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1341 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1342 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1347 // Before all the name mangling, check if therey are any output_spec.cfg
1348 // or hfta_parallelism.cfg entries that do not have a matching query.
1350 string dangling_ospecs = "";
1351 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1352 string oq = (*msii).first;
1353 if(hfta_name_map.count(oq) == 0){
1354 dangling_ospecs += " "+(*msii).first;
1357 if(dangling_ospecs!=""){
1358 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1362 string dangling_par = "";
1363 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1364 string oq = (*msii).first;
1365 if(hfta_name_map.count(oq) == 0){
1366 dangling_par += " "+(*msii).first;
1369 if(dangling_par!=""){
1370 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1375 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1376 // FROM clauses: retarget any name which is an internal node, and
1377 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1378 // when the source hfta has more parallelism than the target node.
1379 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1382 int n_original_hfta_sets = hfta_sets.size();
1383 for(i=0;i<n_original_hfta_sets;++i){
1384 if(hfta_sets[i]->n_parallel > 1){
1385 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1386 set<string> local_nodes; // names of query nodes in the hfta.
1387 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1388 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1391 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1392 string mangler = "__copy"+int_to_string(p);
1393 hfta_node *par_hfta = new hfta_node();
1394 par_hfta->name = hfta_sets[i]->name + mangler;
1395 par_hfta->source_name = hfta_sets[i]->name;
1396 par_hfta->is_udop = hfta_sets[i]->is_udop;
1397 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1398 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1399 par_hfta->parallel_idx = p;
1401 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1404 if(hfta_sets[i]->is_udop){
1405 int root = hfta_sets[i]->query_node_indices[0];
1407 string unequal_par_sources;
1408 set<int>::iterator rfsii;
1409 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1410 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1411 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1414 if(unequal_par_sources != ""){
1415 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1420 vector<string> new_sources;
1421 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1422 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1425 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1426 new_qn->name += mangler;
1427 new_qn->mangler = mangler;
1428 new_qn->refd_tbls = new_sources;
1429 par_hfta->query_node_indices.push_back(qnodes.size());
1430 par_qnode_map[new_qn->name] = qnodes.size();
1431 name_node_map[ new_qn->name ] = qnodes.size();
1432 qnodes.push_back(new_qn);
1434 // regular query node
1435 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1436 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1437 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1438 // rehome the from clause on mangled names.
1439 // create merge nodes as needed for external sources.
1440 for(f=0;f<dup_pt->fm->tlist.size();++f){
1441 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1442 dup_pt->fm->tlist[f]->schema_name += mangler;
1443 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1444 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1445 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1446 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1447 dup_pt->fm->tlist[f]->schema_name += mangler;
1449 vector<string> src_tbls;
1450 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1452 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1455 for(s=0;s<stride;++s){
1456 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1457 src_tbls.push_back(ext_src_name);
1459 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1460 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1461 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1462 // Make a qnode to represent the new merge node
1463 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1464 qn_pt->refd_tbls = src_tbls;
1465 qn_pt->is_udop = false;
1466 qn_pt->is_externally_visible = false;
1467 qn_pt->inferred_visible_node = false;
1468 par_hfta->query_node_indices.push_back(qnodes.size());
1469 par_qnode_map[merge_node_name] = qnodes.size();
1470 name_node_map[ merge_node_name ] = qnodes.size();
1471 qnodes.push_back(qn_pt);
1475 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1476 for(f=0;f<dup_pt->fm->tlist.size();++f){
1477 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1479 new_qn->params = qnodes[hqn_idx]->params;
1480 new_qn->is_udop = false;
1481 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1482 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1483 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1484 par_qnode_map[new_qn->name] = qnodes.size();
1485 name_node_map[ new_qn->name ] = qnodes.size();
1486 qnodes.push_back(new_qn);
1489 hfta_name_map[par_hfta->name] = hfta_sets.size();
1490 hfta_sets.push_back(par_hfta);
1493 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1495 if(!hfta_sets[i]->is_udop){
1496 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1497 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1498 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1499 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1500 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1501 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1502 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1503 vector<string> src_tbls;
1504 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1505 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1506 src_tbls.push_back(ext_src_name);
1508 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1509 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1510 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1511 // Make a qnode to represent the new merge node
1512 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1513 qn_pt->refd_tbls = src_tbls;
1514 qn_pt->is_udop = false;
1515 qn_pt->is_externally_visible = false;
1516 qn_pt->inferred_visible_node = false;
1517 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1518 name_node_map[ merge_node_name ] = qnodes.size();
1519 qnodes.push_back(qn_pt);
1528 // Rebuild the reads_from / sources_to lists in the qnodes
1529 for(q=0;q<qnodes.size();++q){
1530 qnodes[q]->reads_from.clear();
1531 qnodes[q]->sources_to.clear();
1533 for(q=0;q<qnodes.size();++q){
1534 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1535 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1536 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1537 qnodes[q]->reads_from.insert(rf);
1538 qnodes[rf]->sources_to.insert(q);
1543 // Rebuild the reads_from / sources_to lists in hfta_sets
1544 for(q=0;q<hfta_sets.size();++q){
1545 hfta_sets[q]->reads_from.clear();
1546 hfta_sets[q]->sources_to.clear();
1548 for(q=0;q<hfta_sets.size();++q){
1549 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1550 int node = hfta_sets[q]->query_node_indices[s];
1551 set<int>::iterator rfsii;
1552 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1553 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1554 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1555 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1562 for(q=0;q<qnodes.size();++q){
1563 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1564 set<int>::iterator rsii;
1565 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1566 printf(" %d",(*rsii));
1567 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1568 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1569 printf(" %d",(*rsii));
1573 for(q=0;q<hfta_sets.size();++q){
1574 if(hfta_sets[q]->do_generation==false)
1576 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1577 set<int>::iterator rsii;
1578 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1579 printf(" %d",(*rsii));
1580 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1581 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1582 printf(" %d",(*rsii));
1589 // Re-topo sort the hftas
1590 hfta_topsort.clear();
1592 int hnode_srcs_2[hfta_sets.size()];
1593 for(i=0;i<hfta_sets.size();++i){
1594 hnode_srcs_2[i] = 0;
1595 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1600 while(workq.empty() == false){
1601 int node = workq.front();
1603 hfta_topsort.push_back(node);
1604 set<int>::iterator stsii;
1605 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1606 int child = (*stsii);
1607 hnode_srcs_2[child]++;
1608 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1609 workq.push_back(child);
1614 // Ensure that all of the query_node_indices in hfta_sets are topologically
1615 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1616 for(i=0;i<hfta_sets.size();++i){
1617 if(hfta_sets[i]->do_generation){
1618 map<int,int> n_accounted;
1619 vector<int> new_order;
1621 vector<int>::iterator vii;
1622 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1623 n_accounted[(*vii)]= 0;
1625 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1626 set<int>::iterator rfsii;
1627 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1628 if(n_accounted.count((*rfsii)) == 0){
1629 n_accounted[(*vii)]++;
1632 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1633 workq.push_back((*vii));
1637 while(workq.empty() == false){
1638 int node = workq.front();
1640 new_order.push_back(node);
1641 set<int>::iterator stsii;
1642 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1643 if(n_accounted.count((*stsii))){
1644 n_accounted[(*stsii)]++;
1645 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1646 workq.push_back((*stsii));
1651 hfta_sets[i]->query_node_indices = new_order;
1659 /// Global checkng is done, start the analysis and translation
1660 /// of the query parse tree in the order specified by process_order
1663 // Get a list of the LFTAs for global lfta optimization
1664 // TODO: separate building operators from spliting lftas,
1665 // that will make optimizations such as predicate pushing easier.
1666 vector<stream_query *> lfta_list;
1667 stream_query *rootq;
1670 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1672 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1674 int hfta_id = hfta_topsort[qi];
1675 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1679 // Two possibilities, either its a UDOP, or its a collection of queries.
1680 // if(qnodes[curr_list.back()]->is_udop)
1681 if(hfta_sets[hfta_id]->is_udop){
1682 int node_id = curr_list.back();
1683 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1684 opview_entry *opv = new opview_entry();
1686 // Many of the UDOP properties aren't currently used.
1687 opv->parent_qname = "no_parent";
1688 opv->root_name = qnodes[node_id]->name;
1689 opv->view_name = qnodes[node_id]->file;
1691 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1692 opv->udop_alias = tmpstr;
1693 opv->mangler = qnodes[node_id]->mangler;
1695 if(opv->mangler != ""){
1696 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1697 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1700 // This piece of code makes each hfta which referes to the same udop
1701 // reference a distinct running udop. Do this at query optimization time?
1702 // fmtbl->set_udop_alias(opv->udop_alias);
1704 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1705 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1707 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1709 for(s=0;s<subq.size();++s){
1710 // Validate that the fields match.
1711 subquery_spec *sqs = subq[s];
1712 string subq_name = sqs->name + opv->mangler;
1713 vector<field_entry *> flds = Schema->get_fields(subq_name);
1714 if(flds.size() == 0){
1715 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1718 if(flds.size() < sqs->types.size()){
1719 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1722 bool failed = false;
1723 for(f=0;f<sqs->types.size();++f){
1724 data_type dte(sqs->types[f],sqs->modifiers[f]);
1725 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1726 if(! dte.subsumes_type(&dtf) ){
1727 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1731 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1732 string pstr = dte.get_temporal_string();
1733 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1740 /// Validation done, find the subquery, make a copy of the
1741 /// parse tree, and add it to the return list.
1742 for(q=0;q<qnodes.size();++q)
1743 if(qnodes[q]->name == subq_name)
1745 if(q==qnodes.size()){
1746 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1752 // Cross-link to from entry(s) in all sourced-to tables.
1753 set<int>::iterator sii;
1754 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1755 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1756 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1758 for(ii=0;ii<tblvars.size();++ii){
1759 if(tblvars[ii]->schema_name == opv->root_name){
1760 tblvars[ii]->set_opview_idx(opviews.size());
1766 opviews.append(opv);
1769 // Analyze the parse trees in this query,
1770 // put them in rootq
1771 // vector<int> curr_list = process_sets[qi];
1774 ////////////////////////////////////////
1777 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1778 for(qj=0;qj<curr_list.size();++qj){
1780 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1782 // Select the current query parse tree
1783 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1785 // if hfta only, try to fetch any missing schemas
1786 // from the registry (using the print_schema program).
1787 // Here I use a hack to avoid analyzing the query -- all referenced
1788 // tables must be in the from clause
1789 // If there is a problem loading any table, just issue a warning,
1791 tablevar_list_t *fm = fta_parse_tree->get_from();
1792 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1793 // iterate over all referenced tables
1795 for(t=0;t<refd_tbls.size();++t){
1796 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1798 if(tbl_ref < 0){ // if this table is not in the Schema
1801 string cmd="print_schema "+refd_tbls[t];
1802 FILE *schema_in = popen(cmd.c_str(), "r");
1803 if(schema_in == NULL){
1804 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1806 string schema_instr;
1807 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1808 schema_instr += tmpstr;
1810 fta_parse_result = new fta_parse_t();
1811 strcpy(tmp_schema_str,schema_instr.c_str());
1812 FtaParser_setstringinput(tmp_schema_str);
1813 if(FtaParserparse()){
1814 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1816 if( fta_parse_result->tables != NULL){
1818 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1819 Schema->add_table(fta_parse_result->tables->get_table(tl));
1822 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1827 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1835 // Analyze the query.
1836 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1838 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1842 stream_query new_sq(qs, Schema);
1843 if(new_sq.error_code){
1844 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1848 // Add it to the Schema
1849 table_def *output_td = new_sq.get_output_tabledef();
1850 Schema->add_table(output_td);
1852 // Create a query plan from the analyzed parse tree.
1853 // If its a query referneced via FROM, add it to the stream query.
1855 rootq->add_query(new_sq);
1857 rootq = new stream_query(new_sq);
1858 // have the stream query object inherit properties form the analyzed
1859 // hfta_node object.
1860 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1861 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1867 // This stream query has all its parts
1868 // Build and optimize it.
1869 //printf("translate_fta: generating plan.\n");
1870 if(rootq->generate_plan(Schema)){
1871 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1875 // If we've found the query plan head, so now add the output operators
1876 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1877 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1878 multimap<string, int>::iterator mmsi;
1879 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1880 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1881 rootq->add_output_operator(output_specs[(*mmsi).second]);
1887 // Perform query splitting if necessary.
1889 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1892 //for(l=0;l<split_queries.size();++l){
1893 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1899 if(split_queries.size() > 0){ // should be at least one component.
1901 // Compute the number of LFTAs.
1902 int n_lfta = split_queries.size();
1903 if(hfta_returned) n_lfta--;
1904 // Check if a schemaId constraint needs to be inserted.
1906 // Process the LFTA components.
1907 for(l=0;l<n_lfta;++l){
1908 if(lfta_names.count(split_queries[l]->query_name) == 0){
1909 // Grab the lfta for global optimization.
1910 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1911 string liface = "_local_";
1912 // string lmach = "";
1913 string lmach = hostname;
1915 liface = tvec[0]->get_interface(); // iface queries have been resolved
1916 if(tvec[0]->get_machine() != ""){
1917 lmach = tvec[0]->get_machine();
1919 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1922 interface_names.push_back(liface);
1923 machine_names.push_back(lmach);
1926 vector<predicate_t *> schemaid_preds;
1927 for(int irv=0;irv<tvec.size();++irv){
1929 string schema_name = tvec[irv]->get_schema_name();
1930 string rvar_name = tvec[irv]->get_var_name();
1931 int schema_ref = tvec[irv]->get_schema_ref();
1934 // interface_names.push_back(liface);
1935 // machine_names.push_back(lmach);
1937 //printf("Machine is %s\n",lmach.c_str());
1939 // Check if a schemaId constraint needs to be inserted.
1940 if(schema_ref<0){ // can result from some kinds of splits
1941 schema_ref = Schema->get_table_ref(schema_name);
1943 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1946 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1948 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1953 if(tvec[irv]->get_interface() != "_local_"){
1954 if(iface->has_multiple_schemas()){
1955 if(schema_id<0){ // invalid schema_id
1956 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1959 vector<string> iface_schemas = iface->get_property("Schemas");
1960 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1961 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1964 // Ensure that in liface, schema_id is used for only one schema
1965 if(schema_of_schemaid.count(liface)==0){
1966 map<int, string> empty_map;
1967 schema_of_schemaid[liface] = empty_map;
1969 if(schema_of_schemaid[liface].count(schema_id)==0){
1970 schema_of_schemaid[liface][schema_id] = schema_name;
1972 if(schema_of_schemaid[liface][schema_id] != schema_name){
1973 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1977 }else{ // single-schema interface
1978 schema_id = -1; // don't generate schema_id predicate
1979 vector<string> iface_schemas = iface->get_property("Schemas");
1980 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1981 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1984 if(iface_schemas.size()>1){
1985 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1993 // If we need to check the schema_id, insert a predicate into the lfta.
1994 // TODO not just schema_id, the full all_schema_ids set.
1996 colref_t *schid_cr = new colref_t("schemaId");
1997 schid_cr->schema_ref = schema_ref;
1998 schid_cr->table_name = rvar_name;
1999 schid_cr->tablevar_ref = 0;
2000 schid_cr->default_table = false;
2001 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2002 data_type *schid_dt = new data_type("uint");
2003 schid_se->dt = schid_dt;
2005 string schid_str = int_to_string(schema_id);
2006 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2007 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2008 lit_se->dt = schid_dt;
2010 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2011 vector<cnf_elem *> clist;
2012 make_cnf_from_pr(schid_pr, clist);
2013 analyze_cnf(clist[0]);
2014 clist[0]->cost = 1; // cheap one comparison
2015 // cnf built, now insert it.
2016 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2018 // Specialized processing
2019 // filter join, get two schemaid preds
2020 string node_type = split_queries[l]->query_plan[0]->node_type();
2021 if(node_type == "filter_join"){
2022 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2024 fj->pred_t0.push_back(clist[0]);
2026 fj->pred_t1.push_back(clist[0]);
2028 schemaid_preds.push_back(schid_pr);
2030 // watchlist join, get the first schemaid pred
2031 if(node_type == "watch_join"){
2032 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2034 fj->pred_t0.push_back(clist[0]);
2035 schemaid_preds.push_back(schid_pr);
2040 // Specialized processing, currently filter join.
2041 if(schemaid_preds.size()>1){
2042 string node_type = split_queries[l]->query_plan[0]->node_type();
2043 if(node_type == "filter_join"){
2044 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2045 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2046 vector<cnf_elem *> clist;
2047 make_cnf_from_pr(filter_pr, clist);
2048 analyze_cnf(clist[0]);
2049 clist[0]->cost = 1; // cheap one comparison
2050 fj->shared_pred.push_back(clist[0]);
2060 // Set the ht size from the recommendation, if there is one in the rec file
2061 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2062 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2066 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2067 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2068 lfta_list.push_back(split_queries[l]);
2069 lfta_mach_lists[lmach].push_back(split_queries[l]);
2071 // THe following is a hack,
2072 // as I should be generating LFTA code through
2073 // the stream_query object.
2075 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2077 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2080 // Create query description to embed in lfta.c
2081 string lfta_schema_str = split_queries[l]->make_schema();
2082 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2084 // get NIC capabilities.
2086 nic_property *nicprop = NULL;
2087 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2088 if(iface_codegen_type.size()){
2089 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2091 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2096 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2099 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
2100 snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
2102 // STOPPED HERE need to figure out how to generate the code that Vlad needs
2103 // from snap_postion
2105 // TODO NOTE : I'd like it to be the case that registration_query_names
2106 // are the queries to be registered for subsciption.
2107 // but there is some complex bookkeeping here.
2108 registration_query_names.push_back(split_queries[l]->query_name);
2109 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2110 // NOTE: I will assume a 1-1 correspondance between
2111 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2112 // where mach_query_names[lmach][i] contains the index into
2113 // query_names, which names the lfta, and
2114 // mach_query_names[lmach][i] is the stream_query * of the
2115 // corresponding lfta.
2116 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2120 // check if lfta is reusable
2121 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2123 bool lfta_reusable = false;
2124 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2125 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2126 lfta_reusable = true;
2128 lfta_reuse_options.push_back(lfta_reusable);
2130 // LFTA will inherit the liveness timeout specification from the containing query
2131 // it is too conservative as lfta are expected to spend less time per tuple
2134 // extract liveness timeout from query definition
2135 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2136 if (!liveness_timeout) {
2137 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2138 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2139 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2141 lfta_liveness_timeouts.push_back(liveness_timeout);
2143 // Add it to the schema
2144 table_def *td = split_queries[l]->get_output_tabledef();
2145 Schema->append_table(td);
2146 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2151 // If the output is lfta-only, dump out the query name.
2152 if(split_queries.size() == 1 && !hfta_returned){
2153 if(output_query_names ){
2154 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2158 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2163 // output schema summary
2164 if(output_schema_summary){
2165 dump_summary(split_queries[0]);
2171 if(hfta_returned){ // query also has an HFTA component
2172 int hfta_nbr = split_queries.size()-1;
2174 hfta_list.push_back(split_queries[hfta_nbr]);
2176 // report on generated query names
2177 if(output_query_names){
2178 string hfta_name =split_queries[hfta_nbr]->query_name;
2179 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2180 for(l=0;l<hfta_nbr;++l){
2181 string lfta_name =split_queries[l]->query_name;
2182 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2186 // fprintf(stderr,"query names are ");
2187 // for(l=0;l<hfta_nbr;++l){
2188 // if(l>0) fprintf(stderr,",");
2189 // string fta_name =split_queries[l]->query_name;
2190 // fprintf(stderr," %s",fta_name.c_str());
2192 // fprintf(stderr,"\n");
2197 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2198 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2205 //-----------------------------------------------------------------
2206 // Compute and propagate the SE in PROTOCOL fields compute a field.
2207 //-----------------------------------------------------------------
2209 for(i=0;i<lfta_list.size();i++){
2210 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2211 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2213 for(i=0;i<hfta_list.size();i++){
2214 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2215 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2220 //------------------------------------------------------------------------
2221 // Perform individual FTA optimizations
2222 //-----------------------------------------------------------------------
2224 if (partitioned_mode) {
2226 // open partition definition file
2227 string part_fname = config_dir_path + "partition.txt";
2229 FILE* partfd = fopen(part_fname.c_str(), "r");
2231 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2234 PartnParser_setfileinput(partfd);
2235 if (PartnParserparse()) {
2236 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2243 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2245 int num_hfta = hfta_list.size();
2246 for(i=0; i < hfta_list.size(); ++i){
2247 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2250 // Add all new hftas to schema
2251 for(i=num_hfta; i < hfta_list.size(); ++i){
2252 table_def *td = hfta_list[i]->get_output_tabledef();
2253 Schema->append_table(td);
2256 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2260 //------------------------------------------------------------------------
2261 // Do global (cross-fta) optimization
2262 //-----------------------------------------------------------------------
2269 set<string> extra_external_libs;
2271 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2274 // build hfta file name, create output
2275 if(numeric_hfta_flname){
2276 sprintf(tmpstr,"hfta_%d",hfta_count);
2277 hfta_names.push_back(tmpstr);
2278 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2280 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2281 hfta_names.push_back(tmpstr);
2282 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2284 FILE *hfta_fl = fopen(tmpstr,"w");
2285 if(hfta_fl == NULL){
2286 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2289 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2291 // If there is a field verifier, warn about
2292 // lack of compatability
2293 // NOTE : this code assumes that visible non-lfta queries
2294 // are those at the root of a stream query.
2295 string hfta_comment;
2297 string hfta_namespace;
2298 if(hfta_list[i]->defines.count("comment")>0)
2299 hfta_comment = hfta_list[i]->defines["comment"];
2300 if(hfta_list[i]->defines.count("Comment")>0)
2301 hfta_comment = hfta_list[i]->defines["Comment"];
2302 if(hfta_list[i]->defines.count("COMMENT")>0)
2303 hfta_comment = hfta_list[i]->defines["COMMENT"];
2304 if(hfta_list[i]->defines.count("title")>0)
2305 hfta_title = hfta_list[i]->defines["title"];
2306 if(hfta_list[i]->defines.count("Title")>0)
2307 hfta_title = hfta_list[i]->defines["Title"];
2308 if(hfta_list[i]->defines.count("TITLE")>0)
2309 hfta_title = hfta_list[i]->defines["TITLE"];
2310 if(hfta_list[i]->defines.count("namespace")>0)
2311 hfta_namespace = hfta_list[i]->defines["namespace"];
2312 if(hfta_list[i]->defines.count("Namespace")>0)
2313 hfta_namespace = hfta_list[i]->defines["Namespace"];
2314 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2315 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2317 if(field_verifier != NULL){
2319 if(hfta_comment == "")
2320 warning_str += "\tcomment not found.\n";
2322 // Obsolete stuff that Carsten wanted
2323 // if(hfta_title == "")
2324 // warning_str += "\ttitle not found.\n";
2325 // if(hfta_namespace == "")
2326 // warning_str += "\tnamespace not found.\n";
2328 // There is a get_tbl_keys method implemented for qp_nodes,
2329 // integrate it into steam_query, then call it to find keys,
2330 // and annotate feidls with their key-ness.
2331 // If there is a "keys" proprty in the defines block, override anything returned
2332 // from the automated analysis
2334 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2336 for(fi=0;fi<flds.size();fi++){
2337 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2339 if(warning_str != "")
2340 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2341 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2344 // Get the fields in this query
2345 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2347 // do key processing
2348 string hfta_keys_s = "";
2349 if(hfta_list[i]->defines.count("keys")>0)
2350 hfta_keys_s = hfta_list[i]->defines["keys"];
2351 if(hfta_list[i]->defines.count("Keys")>0)
2352 hfta_keys_s = hfta_list[i]->defines["Keys"];
2353 if(hfta_list[i]->defines.count("KEYS")>0)
2354 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2355 string xtra_keys_s = "";
2356 if(hfta_list[i]->defines.count("extra_keys")>0)
2357 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2358 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2359 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2360 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2361 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2363 vector<string> hfta_keys;
2364 vector<string> partial_keys;
2365 vector<string> xtra_keys;
2366 if(hfta_keys_s==""){
2367 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2368 if(xtra_keys_s.size()>0){
2369 xtra_keys = split_string(xtra_keys_s, ',');
2371 for(int xi=0;xi<xtra_keys.size();++xi){
2372 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2373 hfta_keys.push_back(xtra_keys[xi]);
2377 hfta_keys = split_string(hfta_keys_s, ',');
2379 // validate that all of the keys exist in the output.
2380 // (exit on error, as its a bad specificiation)
2381 vector<string> missing_keys;
2382 for(int ki=0;ki<hfta_keys.size(); ++ki){
2384 for(fi=0;fi<flds.size();++fi){
2385 if(hfta_keys[ki] == flds[fi]->get_name())
2389 missing_keys.push_back(hfta_keys[ki]);
2391 if(missing_keys.size()>0){
2392 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2393 for(int hi=0; hi<missing_keys.size(); ++hi){
2394 fprintf(stderr," %s", missing_keys[hi].c_str());
2396 fprintf(stderr,"\n");
2400 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2401 if(hfta_comment != "")
2402 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2403 if(hfta_title != "")
2404 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2405 if(hfta_namespace != "")
2406 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2407 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2408 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2410 // write info about fields to qtree.xml
2412 for(fi=0;fi<flds.size();fi++){
2413 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2414 if(flds[fi]->get_modifier_list()->size()){
2415 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2417 fprintf(qtree_output," />\n");
2420 for(int hi=0;hi<hfta_keys.size();++hi){
2421 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2423 for(int hi=0;hi<partial_keys.size();++hi){
2424 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2426 for(int hi=0;hi<xtra_keys.size();++hi){
2427 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2431 // extract liveness timeout from query definition
2432 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2433 if (!liveness_timeout) {
2434 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2435 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2436 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2438 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2440 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2442 for(itv=0;itv<tmp_tv.size();++itv){
2443 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2445 string ifrs = hfta_list[i]->collect_refd_ifaces();
2447 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2449 fprintf(qtree_output,"\t</HFTA>\n");
2453 // debug only -- do code generation to catch generation-time errors.
2454 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2457 hfta_count++; // for hfta file names with numeric suffixes
2459 hfta_list[i]->get_external_libs(extra_external_libs);
2463 string ext_lib_string;
2464 set<string>::iterator ssi_el;
2465 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2466 ext_lib_string += (*ssi_el)+" ";
2470 // Report on the set of operator views
2471 for(i=0;i<opviews.size();++i){
2472 opview_entry *opve = opviews.get_entry(i);
2473 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2474 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2475 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2476 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2477 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2479 if (!opve->liveness_timeout) {
2480 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2481 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2482 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2484 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2486 for(j=0;j<opve->subq_names.size();j++)
2487 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2488 fprintf(qtree_output,"\t</UDOP>\n");
2492 //-----------------------------------------------------------------
2494 // Create interface-specific meta code files.
2495 // first, open and parse the interface resources file.
2496 ifaces_db = new ifq_t();
2498 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2499 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2500 ifx_fname.c_str(), ierr.c_str());
2504 map<string, vector<stream_query *> >::iterator svsi;
2505 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2506 string lmach = (*svsi).first;
2508 // For this machine, create a set of lftas per interface.
2509 vector<stream_query *> mach_lftas = (*svsi).second;
2510 map<string, vector<stream_query *> > lfta_iface_lists;
2512 for(li=0;li<mach_lftas.size();++li){
2513 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2514 string lfta_iface = "_local_";
2516 string lfta_iface = tvec[0]->get_interface();
2518 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2521 map<string, vector<stream_query *> >::iterator lsvsi;
2522 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2524 string liface = (*lsvsi).first;
2525 vector<stream_query *> iface_lftas = (*lsvsi).second;
2526 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2527 if(iface_codegen_type.size()){
2528 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2530 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2533 string mcs = generate_nic_code(iface_lftas, nicprop);
2536 mcf_flnm = lmach + "_"+liface+".mcf";
2538 mcf_flnm = hostname + "_"+liface+".mcf";
2540 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2541 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2544 fprintf(mcf_fl,"%s",mcs.c_str());
2546 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2547 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2556 //-----------------------------------------------------------------
2559 // Find common filter predicates in the LFTAs.
2560 // in addition generate structs to store the
2561 // temporal attributes unpacked by prefilter
2562 // compute & provide interface for per-interface
2563 // record extraction properties
2565 map<string, vector<stream_query *> >::iterator ssqi;
2566 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2568 string lmach = (*ssqi).first;
2569 bool packed_return = false;
2573 // The LFTAs of this machine.
2574 vector<stream_query *> mach_lftas = (*ssqi).second;
2575 // break up on a per-interface basis.
2576 map<string, vector<stream_query *> > lfta_iface_lists;
2577 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2579 for(li=0;li<mach_lftas.size();++li){
2580 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2581 string lfta_iface = "_local_";
2583 lfta_iface = tvec[0]->get_interface();
2585 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2586 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2590 // Are the return values "packed"?
2591 // This should be done on a per-interface basis.
2592 // But this is defunct code for gs-lite
2593 for(li=0;li<mach_lftas.size();++li){
2594 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2595 string liface = "_local_";
2597 liface = tvec[0]->get_interface();
2599 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2600 if(iface_codegen_type.size()){
2601 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2602 packed_return = true;
2608 // Separate lftas by interface, collect results on a per-interface basis.
2610 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2611 map<string, vector<cnf_set *> > prefilter_preds;
2612 set<unsigned int> pred_ids; // this can be global for all interfaces
2613 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2614 string liface = (*mvsi).first;
2615 vector<cnf_set *> empty_list;
2616 prefilter_preds[liface] = empty_list;
2617 if(! packed_return){
2618 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2621 // get NIC capabilities. (Is this needed?)
2622 nic_property *nicprop = NULL;
2623 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2624 if(iface_codegen_type.size()){
2625 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2627 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2634 // Now that we know the prefilter preds, generate the lfta code.
2635 // Do this for all lftas in this machine.
2636 for(li=0;li<mach_lftas.size();++li){
2637 set<unsigned int> subsumed_preds;
2638 set<unsigned int>::iterator sii;
2640 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2642 if((pid>>16) == li){
2643 subsumed_preds.insert(pid & 0xffff);
2647 string lfta_schema_str = mach_lftas[li]->make_schema();
2648 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2649 nic_property *nicprop = NULL; // no NIC properties?
2650 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2654 // generate structs to store the temporal attributes
2655 // unpacked by prefilter
2656 col_id_set temp_cids;
2657 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2658 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2660 // Compute the lfta bit signatures and the lfta colrefs
2661 // do this on a per-interface basis
2663 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2665 map<string, vector<long long int> > lfta_sigs; // used again later
2666 map<string, int> lfta_snap_pos; // optimize csv parsing
2667 // compute now, use in get_iface_properties
2668 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2669 string liface = (*mvsi).first;
2670 vector<long long int> empty_list;
2671 lfta_sigs[liface] = empty_list;
2672 lfta_snap_pos[liface] = -1;
2674 vector<col_id_set> lfta_cols;
2675 vector<int> lfta_snap_length;
2676 for(li=0;li<lfta_iface_lists[liface].size();++li){
2677 unsigned long long int mask=0, bpos=1;
2679 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2680 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2684 lfta_sigs[liface].push_back(mask);
2685 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2686 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
2687 int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
2688 if(this_snap_pos > lfta_snap_pos[liface])
2689 lfta_snap_pos[liface] = this_snap_pos;
2692 //for(li=0;li<mach_lftas.size();++li){
2693 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2694 //col_id_set::iterator tcisi;
2695 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2696 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2701 // generate the prefilter
2702 // Do this on a per-interface basis, except for the #define
2704 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2705 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2707 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2712 // Generate interface parameter lookup function
2713 lfta_val[lmach] += "// lookup interface properties by name\n";
2714 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2715 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2716 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2718 // collect a lit of interface names used by queries running on this host
2719 set<std::string> iface_names;
2720 for(i=0;i<mach_query_names[lmach].size();i++){
2721 int mi = mach_query_names[lmach][i];
2722 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2724 if(interface_names[mi]=="")
2725 iface_names.insert("DEFAULTDEV");
2727 iface_names.insert(interface_names[mi]);
2730 // generate interface property lookup code for every interface
2731 set<std::string>::iterator sir;
2732 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2733 if (sir == iface_names.begin())
2734 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2736 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2738 // iterate through interface properties
2739 vector<string> iface_properties;
2740 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2741 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2744 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2747 if (iface_properties.empty())
2748 lfta_val[lmach] += "\t\treturn NULL;\n";
2750 for (int i = 0; i < iface_properties.size(); ++i) {
2752 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2754 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2756 // combine all values for the interface property using comma separator
2757 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2758 lfta_val[lmach] += "\t\t\treturn \"";
2759 for (int j = 0; j < vals.size(); ++j) {
2760 lfta_val[lmach] += vals[j];
2761 if (j != vals.size()-1)
2762 lfta_val[lmach] += ",";
2764 lfta_val[lmach] += "\";\n";
2766 lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
2767 lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
2768 lfta_val[lmach] += "\t\t} else\n";
2769 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2772 lfta_val[lmach] += "\t} else\n";
2773 lfta_val[lmach] += "\t\treturn NULL;\n";
2774 lfta_val[lmach] += "}\n\n";
2777 // Generate a full list of FTAs for clearinghouse reference
2778 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2779 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2782 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2783 string liface = (*mvsi).first;
2784 if(liface != "_local_"){ // these don't register themselves
2785 vector<stream_query *> lfta_list = (*mvsi).second;
2786 for(i=0;i<lfta_list.size();i++){
2787 int mi = lfta_iface_qname_ix[liface][i];
2788 if(first) first = false;
2789 else lfta_val[lmach] += ", ";
2790 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2794 // for (i = 0; i < registration_query_names.size(); ++i) {
2796 // lfta_val[lmach] += ", ";
2797 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2800 for (i = 0; i < hfta_list.size(); ++i) {
2801 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2803 lfta_val[lmach] += ", NULL};\n\n";
2806 // Add the initialization function to lfta.c
2807 // Change to accept the interface name, and
2808 // set the prefilter function accordingly.
2809 // see the example in demo/err2
2810 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2811 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2813 // for(i=0;i<mach_query_names[lmach].size();i++)
2814 // int mi = mach_query_names[lmach][i];
2815 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2817 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2818 string liface = (*mvsi).first;
2819 vector<stream_query *> lfta_list = (*mvsi).second;
2820 for(i=0;i<lfta_list.size();i++){
2821 stream_query *lfta_sq = lfta_list[i];
2822 int mi = lfta_iface_qname_ix[liface][i];
2824 if(liface == "_local_"){
2825 // Don't register an init function, do the init code inline
2826 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2827 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2831 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2833 string this_iface = "DEFAULTDEV";
2834 if(interface_names[mi]!="")
2835 this_iface = '"'+interface_names[mi]+'"';
2836 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2837 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2838 // if(interface_names[mi]=="")
2839 // lfta_val[lmach]+="DEFAULTDEV";
2841 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2842 lfta_val[lmach] += this_iface;
2845 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2846 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2848 sprintf(tmpstr,",%d",snap_lengths[mi]);
2849 lfta_val[lmach] += tmpstr;
2851 // unsigned long long int mask=0, bpos=1;
2853 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2854 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2856 // bpos = bpos << 1;
2860 // sprintf(tmpstr,",%lluull",mask);
2861 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2862 lfta_val[lmach]+=tmpstr;
2864 lfta_val[lmach] += ",0ull";
2867 lfta_val[lmach] += ");\n";
2871 // End of lfta prefilter stuff
2872 // --------------------------------------------------
2874 // If there is a field verifier, warn about
2875 // lack of compatability
2876 string lfta_comment;
2878 string lfta_namespace;
2879 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2880 if(ldefs.count("comment")>0)
2881 lfta_comment = lfta_sq->defines["comment"];
2882 if(ldefs.count("Comment")>0)
2883 lfta_comment = lfta_sq->defines["Comment"];
2884 if(ldefs.count("COMMENT")>0)
2885 lfta_comment = lfta_sq->defines["COMMENT"];
2886 if(ldefs.count("title")>0)
2887 lfta_title = lfta_sq->defines["title"];
2888 if(ldefs.count("Title")>0)
2889 lfta_title = lfta_sq->defines["Title"];
2890 if(ldefs.count("TITLE")>0)
2891 lfta_title = lfta_sq->defines["TITLE"];
2892 if(ldefs.count("NAMESPACE")>0)
2893 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2894 if(ldefs.count("Namespace")>0)
2895 lfta_namespace = lfta_sq->defines["Namespace"];
2896 if(ldefs.count("namespace")>0)
2897 lfta_namespace = lfta_sq->defines["namespace"];
2899 string lfta_ht_size;
2900 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2901 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2902 if(ldefs.count("aggregate_slots")>0){
2903 lfta_ht_size = ldefs["aggregate_slots"];
2906 // NOTE : I'm assuming that visible lftas do not start with _fta.
2907 // -- will fail for non-visible simple selection queries.
2908 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2910 if(lfta_comment == "")
2911 warning_str += "\tcomment not found.\n";
2912 // Obsolete stuff that carsten wanted
2913 // if(lfta_title == "")
2914 // warning_str += "\ttitle not found.\n";
2915 // if(lfta_namespace == "")
2916 // warning_str += "\tnamespace not found.\n";
2918 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2920 for(fi=0;fi<flds.size();fi++){
2921 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2923 if(warning_str != "")
2924 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2925 registration_query_names[mi].c_str(),warning_str.c_str());
2929 // Create qtree output
2930 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2931 if(lfta_comment != "")
2932 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2933 if(lfta_title != "")
2934 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2935 if(lfta_namespace != "")
2936 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2937 if(lfta_ht_size != "")
2938 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2940 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2942 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2943 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2944 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2945 for(int t=0;t<itbls.size();++t){
2946 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2948 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2949 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2950 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2951 // write info about fields to qtree.xml
2952 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2954 for(fi=0;fi<flds.size();fi++){
2955 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2956 if(flds[fi]->get_modifier_list()->size()){
2957 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2959 fprintf(qtree_output," />\n");
2961 fprintf(qtree_output,"\t</LFTA>\n");
2967 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2968 string liface = (*mvsi).first;
2970 " if (!strcmp(device, \""+liface+"\")) \n"
2971 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2975 " if(lfta_prefilter == NULL){\n"
2976 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2983 lfta_val[lmach] += "}\n\n";
2985 if(!(debug_only || hfta_only) ){
2988 lfta_flnm = lmach + "_lfta.c";
2990 lfta_flnm = hostname + "_lfta.c";
2991 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2992 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2995 fprintf(lfta_out,"%s",lfta_header.c_str());
2996 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2997 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
3002 // Say what are the operators which must execute
3003 if(opviews.size()>0)
3004 fprintf(stderr,"The queries use the following external operators:\n");
3005 for(i=0;i<opviews.size();++i){
3006 opview_entry *opv = opviews.get_entry(i);
3007 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
3011 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
3012 machine_names, schema_file_name,
3014 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
3017 fprintf(qtree_output,"</QueryNodes>\n");
3022 ////////////////////////////////////////////////////////////
3024 void generate_makefile(vector<string> &input_file_names, int nfiles,
3025 vector<string> &hfta_names, opview_set &opviews,
3026 vector<string> &machine_names,
3027 string schema_file_name,
3028 vector<string> &interface_names,
3029 ifq_t *ifdb, string &config_dir_path,
3032 map<string, vector<int> > &rts_hload
3036 if(config_dir_path != ""){
3037 config_dir_path = "-C "+config_dir_path;
3041 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3042 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3044 // if(libz_exists && !libast_exists){
3045 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3049 // Get set of operator executable files to run
3051 set<string>::iterator ssi;
3052 for(i=0;i<opviews.size();++i){
3053 opview_entry *opv = opviews.get_entry(i);
3054 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3057 FILE *outfl = fopen("Makefile", "w");
3059 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3064 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3065 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3069 fprintf(outfl," -DLFTA_STATS");
3071 // Gather the set of interfaces
3072 // Also, gather "base interface names" for use in computing
3073 // the hash splitting to virtual interfaces.
3074 // TODO : must update to hanndle machines
3076 set<string> base_vifaces; // base interfaces of virtual interfaces
3077 map<string, string> ifmachines;
3078 map<string, string> ifattrs;
3079 for(i=0;i<interface_names.size();++i){
3080 ifaces.insert(interface_names[i]);
3081 ifmachines[interface_names[i]] = machine_names[i];
3083 size_t Xpos = interface_names[i].find_last_of("X");
3084 if(Xpos!=string::npos){
3085 string iface = interface_names[i].substr(0,Xpos);
3086 base_vifaces.insert(iface);
3088 // get interface attributes and add them to the list
3091 // Do we need to include protobuf libraries?
3092 // TODO Move to the interface library: get the libraries to include
3093 // for an interface type
3095 bool use_proto = false;
3096 bool use_bsa = false;
3097 bool use_kafka = false;
3098 bool use_ssl = false;
3101 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3102 string ifnm = (*ssi);
3103 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3104 for(int ift_i=0;ift_i<ift.size();ift_i++){
3105 if(ift[ift_i]=="PROTO"){
3106 #ifdef PROTO_ENABLED
3109 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3114 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3115 for(int ift_i=0;ift_i<ift.size();ift_i++){
3116 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3120 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3125 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3126 for(int ift_i=0;ift_i<ift.size();ift_i++){
3127 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3128 #ifdef KAFKA_ENABLED
3131 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3136 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
3137 for(int ift_i=0;ift_i<ift.size();ift_i++){
3138 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3142 fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
3153 for(i=0;i<hfta_names.size();++i)
3154 fprintf(outfl," %s",hfta_names[i].c_str());
3158 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3159 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3161 fprintf(outfl,"-L. ");
3163 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3165 fprintf(outfl,"-lgscppads -lpads ");
3167 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
3169 fprintf(outfl, " -lpz -lz -lbz ");
3170 if(libz_exists && libast_exists)
3171 fprintf(outfl," -last ");
3173 fprintf(outfl, " -ldll -ldl ");
3175 #ifdef PROTO_ENABLED
3176 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3179 fprintf(outfl, " -lbsa_stream ");
3181 #ifdef KAFKA_ENABLED
3182 fprintf(outfl, " -lrdkafka ");
3185 fprintf(outfl, " -lssl -lcrypto ");
3187 fprintf(outfl," -lgscpaux");
3189 fprintf(outfl," -fprofile-arcs");
3194 "lfta.o: %s_lfta.c\n"
3195 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3197 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3198 for(i=0;i<nfiles;++i)
3199 fprintf(outfl," %s",input_file_names[i].c_str());
3201 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3203 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3205 for(i=0;i<nfiles;++i)
3206 fprintf(outfl," %s",input_file_names[i].c_str());
3207 fprintf(outfl,"\n");
3209 for(i=0;i<hfta_names.size();++i)
3212 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3215 "\t$(CPP) -o %s.o -c %s.cc\n"
3218 hfta_names[i].c_str(), hfta_names[i].c_str(),
3219 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3220 hfta_names[i].c_str(), hfta_names[i].c_str(),
3221 hfta_names[i].c_str(), hfta_names[i].c_str()
3226 "packet_schema.txt:\n"
3227 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3229 "external_fcns.def:\n"
3230 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3233 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3234 for(i=0;i<hfta_names.size();++i)
3235 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3236 fprintf(outfl,"\n");
3242 // Gather the set of interfaces
3243 // TODO : must update to hanndle machines
3244 // TODO : lookup interface attributes and add them as a parameter to rts process
3245 outfl = fopen("runit", "w");
3247 fprintf(stderr,"Can't open runit for write, exiting.\n");
3255 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3257 "if [ ! -f gshub.log ]\n"
3259 "\techo \"Failed to start bin/gshub.py\"\n"
3262 "ADDR=`cat gshub.log`\n"
3263 "ps opgid= $! >> gs.pids\n"
3264 "./rts $ADDR default ").c_str(), outfl);
3267 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3268 string ifnm = (*ssi);
3269 // suppress internal _local_ interface
3270 if (ifnm == "_local_")
3272 fprintf(outfl, "%s ",ifnm.c_str());
3273 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3274 for(j=0;j<ifv.size();++j)
3275 fprintf(outfl, "%s ",ifv[j].c_str());
3277 fprintf(outfl, " &\n");
3278 fprintf(outfl, "echo $! >> gs.pids\n");
3279 for(i=0;i<hfta_names.size();++i)
3280 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3282 for(j=0;j<opviews.opview_list.size();++j){
3283 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3287 system("chmod +x runit");
3289 outfl = fopen("stopit", "w");
3291 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3295 fprintf(outfl,"#!/bin/sh\n"
3297 "if [ ! -f gs.pids ]\n"
3301 "for pgid in `cat gs.pids`\n"
3303 "kill -TERM -$pgid\n"
3306 "for pgid in `cat gs.pids`\n"
3313 system("chmod +x stopit");
3315 //-----------------------------------------------
3317 /* For now disable support for virtual interfaces
3318 outfl = fopen("set_vinterface_hash.bat", "w");
3320 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3324 // The format should be determined by an entry in the ifres.xml file,
3325 // but for now hardcode the only example I have.
3326 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3327 if(rts_hload.count((*ssi))){
3328 string iface_name = (*ssi);
3329 string iface_number = "";
3330 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3331 if(isdigit(iface_name[j])){
3332 iface_number = iface_name[j];
3333 if(j>0 && isdigit(iface_name[j-1]))
3334 iface_number = iface_name[j-1] + iface_number;
3338 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3339 vector<int> halloc = rts_hload[iface_name];
3341 for(j=0;j<halloc.size();++j){
3344 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3345 prev_limit = halloc[j];
3347 fprintf(outfl,"\n");
3351 system("chmod +x set_vinterface_hash.bat");
3355 // Code for implementing a local schema
3357 table_list qpSchema;
3359 // Load the schemas of any LFTAs.
3361 for(l=0;l<hfta_nbr;++l){
3362 stream_query *sq0 = split_queries[l];
3363 table_def *td = sq0->get_output_tabledef();
3364 qpSchema.append_table(td);
3366 // load the schemas of any other ref'd tables.
3368 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3370 for(ti=0;ti<input_tbl_names.size();++ti){
3371 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3373 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3375 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3378 qpSchema.append_table(Schema->get_table(tbl_ref));
3383 // Functions related to parsing.
3386 static int split_string(char *instr,char sep, char **words,int max_words){
3392 words[nwords++] = str;
3393 while( (loc = strchr(str,sep)) != NULL){
3396 if(nwords >= max_words){
3397 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3398 nwords = max_words-1;
3400 words[nwords++] = str;