1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
69 // Interface to the field list verifier
70 field_list *field_verifier = NULL;
72 #define TMPSTRLEN 1000
75 #define PATH_DELIM '/'
78 char tmp_schema_str[10000];
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
85 // Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
88 // Interface to FTA definition lexer and parser ...
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
99 // Interface to external function lexer and parser ...
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
105 ext_fcn_list *Ext_fcns;
108 // Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
118 // forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120 vector<string> &hfta_names, opview_set &opviews,
121 vector<string> &machine_names,
122 string schema_file_name,
123 vector<string> &interface_names,
124 ifq_t *ifdb, string &config_dir_path,
127 map<string, vector<int> > &rts_hload
130 //static int split_string(char *instr,char sep, char **words,int max_words);
133 FILE *schema_summary_output = NULL; // query names
135 // Dump schema summary
136 void dump_summary(stream_query *str){
137 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
139 table_def *sch = str->get_output_tabledef();
141 vector<field_entry *> flds = sch->get_fields();
143 for(f=0;f<flds.size();++f){
144 if(f>0) fprintf(schema_summary_output,"|");
145 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
147 fprintf(schema_summary_output,"\n");
148 for(f=0;f<flds.size();++f){
149 if(f>0) fprintf(schema_summary_output,"|");
150 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
152 fprintf(schema_summary_output,"\n");
156 string hostname; // name of current host.
158 bool generate_stats = false;
159 string root_path = "../..";
162 int main(int argc, char **argv){
163 char tmpstr[TMPSTRLEN];
167 set<int>::iterator si;
169 vector<string> registration_query_names; // for lfta.c registration
170 map<string, vector<int> > mach_query_names; // list queries of machine
171 vector<int> snap_lengths; // for lfta.c registration
172 vector<string> interface_names; // for lfta.c registration
173 vector<string> machine_names; // machine of interface
174 vector<bool> lfta_reuse_options; // for lfta.c registration
175 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
176 vector<string> hfta_names; // hfta cource code names, for
177 // creating make file.
178 vector<string> qnames; // ensure unique names
179 map<string, int> lfta_names; // keep track of unique lftas.
182 // set these to 1 to debug the parser
184 Ext_fcnsParserdebug = 0;
186 FILE *lfta_out; // lfta.c output.
187 FILE *fta_in; // input file
188 FILE *table_schemas_in; // source tables definition file
189 FILE *query_name_output; // query names
190 FILE *qtree_output; // interconnections of query nodes
192 // -------------------------------
193 // Handling of Input Arguments
194 // -------------------------------
195 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
196 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
197 "\t[-B] : debug only (don't create output files)\n"
198 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
199 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
200 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
201 "\t[-C] : use <config directory> for definition files\n"
202 "\t[-l] : use <library directory> for library queries\n"
203 "\t[-N] : output query names in query_names.txt\n"
204 "\t[-H] : create HFTA only (no schema_file)\n"
205 "\t[-Q] : use query name for hfta suffix\n"
206 "\t[-M] : generate make file and runit, stopit scripts\n"
207 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
208 "\t[-f] : Output schema summary to schema_summary.txt\n"
209 "\t[-P] : link with PADS\n"
210 "\t[-h] : override host name.\n"
211 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
212 "\t[-R] : path to root of GS-lite\n"
215 // parameters gathered from command line processing
216 string external_fcns_path;
217 // string internal_fcn_path;
218 string config_dir_path;
219 string library_path = "./";
220 vector<string> input_file_names;
221 string schema_file_name;
222 bool debug_only = false;
223 bool hfta_only = false;
224 bool output_query_names = false;
225 bool output_schema_summary=false;
226 bool numeric_hfta_flname = true;
227 bool create_makefile = false;
228 bool distributed_mode = false;
229 bool partitioned_mode = false;
230 bool use_live_hosts_file = false;
231 bool use_pads = false;
232 bool clean_make = false;
233 int n_virtual_interfaces = 1;
236 while((chopt = getopt(argc,argv,optstr)) != -1){
242 distributed_mode = true;
245 partitioned_mode = true;
248 use_live_hosts_file = true;
252 config_dir_path = string(optarg) + string("/");
256 library_path = string(optarg) + string("/");
259 output_query_names = true;
262 numeric_hfta_flname = false;
265 if(schema_file_name == ""){
270 output_schema_summary=true;
273 create_makefile=true;
294 n_virtual_interfaces = atoi(optarg);
295 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
296 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
297 n_virtual_interfaces = 1;
302 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
303 fprintf(stderr,"%s\n", usage_str);
306 fprintf(stderr, "Argument was %c\n", optopt);
307 fprintf(stderr,"Invalid arguments\n");
308 fprintf(stderr,"%s\n", usage_str);
314 for (int i = 0; i < argc; ++i) {
315 if((schema_file_name == "") && !hfta_only){
316 schema_file_name = argv[i];
318 input_file_names.push_back(argv[i]);
322 if(input_file_names.size() == 0){
323 fprintf(stderr,"%s\n", usage_str);
328 string clean_cmd = "rm Makefile hfta_*.cc";
329 int clean_ret = system(clean_cmd.c_str());
331 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
336 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
338 // Open globally used file names.
340 // prepend config directory to schema file
341 schema_file_name = config_dir_path + schema_file_name;
342 external_fcns_path = config_dir_path + string("external_fcns.def");
343 string ifx_fname = config_dir_path + string("ifres.xml");
345 // Find interface query file(s).
347 gethostname(tmpstr,TMPSTRLEN);
350 hostname_len = strlen(tmpstr);
351 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
352 vector<string> ifq_fls;
354 ifq_fls.push_back(ifq_fname);
357 // Get the field list, if it exists
358 string flist_fl = config_dir_path + "field_list.xml";
360 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
361 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
362 xml_leaves = new xml_t();
363 xmlParser_setfileinput(flf_in);
364 if(xmlParserparse()){
365 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
367 field_verifier = new field_list(xml_leaves);
372 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
373 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
379 if(!(debug_only || hfta_only)){
380 if((lfta_out = fopen("lfta.c","w")) == NULL){
381 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
387 // Get the output specification file.
389 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
390 string ospec_fl = "output_spec.cfg";
392 vector<ospec_str *> output_specs;
393 multimap<string, int> qname_to_ospec;
394 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
397 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
399 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
401 // make operator type lowercase
403 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
404 *tmpc = tolower(*tmpc);
406 ospec_str *tmp_ospec = new ospec_str();
407 tmp_ospec->query = flds[0];
408 tmp_ospec->operator_type = flds[1];
409 tmp_ospec->operator_param = flds[2];
410 tmp_ospec->output_directory = flds[3];
411 tmp_ospec->bucketwidth = atoi(flds[4]);
412 tmp_ospec->partitioning_flds = flds[5];
413 tmp_ospec->n_partitions = atoi(flds[6]);
414 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
415 output_specs.push_back(tmp_ospec);
417 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
422 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
427 string pspec_fl = "hfta_parallelism.cfg";
429 map<string, int> hfta_parallelism;
430 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
433 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
434 bool good_entry = true;
436 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
438 string hname = flds[0];
439 int par = atoi(flds[1]);
440 if(par <= 0 || par > n_virtual_interfaces){
441 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444 if(good_entry && n_virtual_interfaces % par != 0){
445 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
449 hfta_parallelism[hname] = par;
453 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
457 // LFTA hash table sizes
458 string htspec_fl = "lfta_htsize.cfg";
459 FILE *htsp_in = NULL;
460 map<string, int> lfta_htsize;
461 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
464 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
465 bool good_entry = true;
467 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
469 string lfta_name = flds[0];
470 int htsz = atoi(flds[1]);
472 lfta_htsize[lfta_name] = htsz;
474 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
479 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
482 // LFTA vitual interface hash split
483 string rtlspec_fl = "rts_load.cfg";
485 map<string, vector<int> > rts_hload;
486 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
491 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
492 bool good_entry = true;
496 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
498 iface_name = flds[0];
501 for(j=1;j<nflds;++j){
502 int h = atoi(flds[j]);
506 hload.push_back(cumm_h);
512 rts_hload[iface_name] = hload;
514 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
521 if(output_query_names){
522 if((query_name_output = fopen("query_names.txt","w")) == NULL){
523 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
528 if(output_schema_summary){
529 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
530 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
535 if((qtree_output = fopen("qtree.xml","w")) == NULL){
536 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
539 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
540 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
541 fprintf(qtree_output,"<QueryNodes>\n");
544 // Get an initial Schema
547 // Parse the table schema definitions.
548 fta_parse_result = new fta_parse_t();
549 FtaParser_setfileinput(table_schemas_in);
550 if(FtaParserparse()){
551 fprintf(stderr,"Table schema parse failed.\n");
554 if(fta_parse_result->parse_type != TABLE_PARSE){
555 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
558 Schema = fta_parse_result->tables;
560 // Ensure that all schema_ids, if set, are distinct.
561 // Obsolete? There is code elsewhere to ensure that schema IDs are
562 // distinct on a per-interface basis.
566 for(int t=0;t<Schema->size();++t){
567 int sch_id = Schema->get_table(t)->get_schema_id();
569 if(found_ids.find(sch_id) != found_ids.end()){
570 dup_ids.insert(sch_id);
572 found_ids.insert(sch_id);
575 if(dup_ids.size()>0){
576 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
577 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
578 fprintf(stderr," %d",(*dit));
579 fprintf(stderr,"\n");
586 // Process schema field inheritance
588 retval = Schema->unroll_tables(err_str);
590 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
594 // hfta only => we will try to fetch schemas from the registry.
595 // therefore, start off with an empty schema.
596 Schema = new table_list();
600 // Open and parse the external functions file.
601 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
602 if(Ext_fcnsParserin == NULL){
603 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
604 Ext_fcns = new ext_fcn_list();
606 if(Ext_fcnsParserparse()){
607 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
608 Ext_fcns = new ext_fcn_list();
611 if(Ext_fcns->validate_fcns(err_str)){
612 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
616 // Open and parse the interface resources file.
617 // ifq_t *ifaces_db = new ifq_t();
619 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
620 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
621 // ifx_fname.c_str(), ierr.c_str());
624 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
625 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
626 // ifq_fname.c_str(), ierr.c_str());
631 // The LFTA code string.
632 // Put the standard preamble here.
633 // NOTE: the hash macros, fcns should go into the run time
634 map<string, string> lfta_val;
635 map<string, string> lfta_prefilter_val;
638 "#include <limits.h>\n"
639 "#include \"rts.h\"\n"
640 "#include \"fta.h\"\n"
641 "#include \"lapp.h\"\n"
642 "#include \"rts_udaf.h\"\n"
643 "#include<stdio.h>\n"
644 "#include <float.h>\n"
645 "#include \"rdtsc.h\"\n"
646 "#include \"watchlist.h\"\n\n"
649 // Get any locally defined parsing headers
651 memset(&glob_result, 0, sizeof(glob_result));
653 // do the glob operation TODO should be from GSROOT
654 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
655 if(return_value == 0){
657 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
659 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
660 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
664 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
668 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
669 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
670 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
671 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
676 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
678 "#define SLOT_FILLED 0x04\n"
679 "#define SLOT_GEN_BITS 0x03\n"
680 "#define SLOT_HASH_BITS 0xfffffff8\n"
681 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
682 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
683 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
686 "#define lfta_BOOL_to_hash(x) (x)\n"
687 "#define lfta_USHORT_to_hash(x) (x)\n"
688 "#define lfta_UINT_to_hash(x) (x)\n"
689 "#define lfta_IP_to_hash(x) (x)\n"
690 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
691 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
692 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
693 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
694 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
695 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
696 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
697 " for(i=0;i<x.length;++i){\n"
698 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
704 " if((i%4)!=0) ret ^=tmp_sum;\n"
710 //////////////////////////////////////////////////////////////////
711 ///// Get all of the query parse trees
715 int hfta_count = 0; // for numeric suffixes to hfta .cc files
717 //---------------------------
718 // Global info needed for post processing.
720 // Set of operator views ref'd in the query set.
722 // lftas on a per-machine basis.
723 map<string, vector<stream_query *> > lfta_mach_lists;
724 int nfiles = input_file_names.size();
725 vector<stream_query *> hfta_list; // list of hftas.
726 map<string, stream_query *> sq_map; // map from query name to stream query.
729 //////////////////////////////////////////
731 // Open and parse the interface resources file.
732 ifq_t *ifaces_db = new ifq_t();
734 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
735 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
736 ifx_fname.c_str(), ierr.c_str());
739 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
740 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
741 ifq_fls[0].c_str(), ierr.c_str());
745 map<string, string> qname_to_flname; // for detecting duplicate query names
749 // Parse the files to create a vector of parse trees.
750 // Load qnodes with information to perform a topo sort
751 // based on query dependencies.
752 vector<query_node *> qnodes; // for topo sort.
753 map<string,int> name_node_map; // map query name to qnodes entry
754 for(i=0;i<input_file_names.size();i++){
756 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
757 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
760 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
762 // Parse the FTA query
763 fta_parse_result = new fta_parse_t();
764 FtaParser_setfileinput(fta_in);
765 if(FtaParserparse()){
766 fprintf(stderr,"FTA parse failed.\n");
769 if(fta_parse_result->parse_type != QUERY_PARSE){
770 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
774 // returns a list of parse trees
775 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
776 for(p=0;p<qlist.size();++p){
777 table_exp_t *fta_parse_tree = qlist[p];
778 // query_parse_trees.push_back(fta_parse_tree);
780 // compute the default name -- extract from query name
781 strcpy(tmpstr,input_file_names[i].c_str());
782 char *qname = strrchr(tmpstr,PATH_DELIM);
787 char *qname_end = strchr(qname,'.');
788 if(qname_end != NULL) *qname_end = '\0';
789 string qname_str = qname;
790 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
792 // Deternmine visibility. Should I be attaching all of the output methods?
793 if(qname_to_ospec.count(imputed_qname)>0)
794 fta_parse_tree->set_visible(true);
796 fta_parse_tree->set_visible(false);
799 // Create a manipulable repesentation of the parse tree.
800 // the qnode inherits the visibility assigned to the parse tree.
801 int pos = qnodes.size();
802 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
803 name_node_map[ qnodes[pos]->name ] = pos;
804 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
805 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
806 // qfiles.push_back(i);
808 // Check for duplicate query names
809 // NOTE : in hfta-only generation, I should
810 // also check with the names of the registered queries.
811 if(qname_to_flname.count(qnodes[pos]->name) > 0){
812 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
813 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
816 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
817 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
818 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
821 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
827 // Add the library queries
830 for(pos=0;pos<qnodes.size();++pos){
832 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
833 string src_tbl = qnodes[pos]->refd_tbls[fi];
834 if(qname_to_flname.count(src_tbl) == 0){
835 int last_sep = src_tbl.find_last_of('/');
836 if(last_sep != string::npos){
837 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
838 string target_qname = src_tbl.substr(last_sep+1);
839 string qpathname = library_path + src_tbl + ".gsql";
840 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
841 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
843 fprintf(stderr,"After exit\n");
845 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
846 // Parse the FTA query
847 fta_parse_result = new fta_parse_t();
848 FtaParser_setfileinput(fta_in);
849 if(FtaParserparse()){
850 fprintf(stderr,"FTA parse failed.\n");
853 if(fta_parse_result->parse_type != QUERY_PARSE){
854 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
858 map<string, int> local_query_map;
859 vector<string> local_query_names;
860 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
861 for(p=0;p<qlist.size();++p){
862 table_exp_t *fta_parse_tree = qlist[p];
863 fta_parse_tree->set_visible(false); // assumed to not produce output
864 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
865 if(imputed_qname == target_qname)
866 imputed_qname = src_tbl;
867 if(local_query_map.count(imputed_qname)>0){
868 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
871 local_query_map[ imputed_qname ] = p;
872 local_query_names.push_back(imputed_qname);
875 if(local_query_map.count(src_tbl)==0){
876 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
880 vector<int> worklist;
881 set<int> added_queries;
882 vector<query_node *> new_qnodes;
883 worklist.push_back(local_query_map[target_qname]);
884 added_queries.insert(local_query_map[target_qname]);
886 int qpos = qnodes.size();
887 for(qq=0;qq<worklist.size();++qq){
888 int q_id = worklist[qq];
889 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
890 new_qnodes.push_back( new_qnode);
891 vector<string> refd_tbls = new_qnode->refd_tbls;
893 for(ff = 0;ff<refd_tbls.size();++ff){
894 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
896 if(name_node_map.count(refd_tbls[ff])>0){
897 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
900 worklist.push_back(local_query_map[refd_tbls[ff]]);
906 for(qq=0;qq<new_qnodes.size();++qq){
907 int qpos = qnodes.size();
908 qnodes.push_back(new_qnodes[qq]);
909 name_node_map[qnodes[qpos]->name ] = qpos;
910 qname_to_flname[qnodes[qpos]->name ] = qpathname;
924 //---------------------------------------
929 string udop_missing_sources;
930 for(i=0;i<qnodes.size();++i){
932 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
933 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
935 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
936 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
937 int pos = qnodes.size();
938 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
939 name_node_map[ qnodes[pos]->name ] = pos;
940 qnodes[pos]->is_externally_visible = false; // its visible
941 // Need to mark the source queries as visible.
943 string missing_sources = "";
944 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
945 string src_tbl = qnodes[pos]->refd_tbls[si];
946 if(name_node_map.count(src_tbl)==0){
947 missing_sources += src_tbl + " ";
950 if(missing_sources != ""){
951 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
958 if(udop_missing_sources != ""){
959 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
965 ////////////////////////////////////////////////////////////////////
966 /// Check parse trees to verify that some
967 /// global properties are met :
968 /// if q1 reads from q2, then
969 /// q2 is processed before q1
970 /// q1 can supply q2's parameters
971 /// Verify there is no cycle in the reads-from graph.
973 // Compute an order in which to process the
976 // Start by building the reads-from lists.
979 for(i=0;i<qnodes.size();++i){
981 vector<string> refd_tbls = qnodes[i]->refd_tbls;
982 for(fi = 0;fi<refd_tbls.size();++fi){
983 if(name_node_map.count(refd_tbls[fi])>0){
984 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
985 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
991 // If one query reads the result of another,
992 // check for parameter compatibility. Currently it must
993 // be an exact match. I will move to requiring
994 // containment after re-ordering, but will require
995 // some analysis for code generation which is not
997 //printf("There are %d query nodes.\n",qnodes.size());
1000 for(i=0;i<qnodes.size();++i){
1001 vector<var_pair_t *> target_params = qnodes[i]->params;
1002 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1003 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1004 if(target_params.size() != source_params.size()){
1005 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1009 for(p=0;p<target_params.size();++p){
1010 if(! (target_params[p]->name == source_params[p]->name &&
1011 target_params[p]->val == source_params[p]->val ) ){
1012 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1021 // Start by counting inedges.
1022 for(i=0;i<qnodes.size();++i){
1023 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1024 qnodes[(*si)]->n_consumers++;
1028 // The roots are the nodes with indegree zero.
1030 for(i=0;i<qnodes.size();++i){
1031 if(qnodes[i]->n_consumers == 0){
1032 if(qnodes[i]->is_externally_visible == false){
1033 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1039 // Remove the parts of the subtree that produce no output.
1040 set<int> valid_roots;
1041 set<int> discarded_nodes;
1042 set<int> candidates;
1043 while(roots.size() >0){
1044 for(si=roots.begin();si!=roots.end();++si){
1045 if(qnodes[(*si)]->is_externally_visible){
1046 valid_roots.insert((*si));
1048 discarded_nodes.insert((*si));
1049 set<int>::iterator sir;
1050 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1051 qnodes[(*sir)]->n_consumers--;
1052 if(qnodes[(*sir)]->n_consumers == 0)
1053 candidates.insert( (*sir));
1060 roots = valid_roots;
1061 if(discarded_nodes.size()>0){
1062 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1064 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1065 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1067 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1069 fprintf(stderr,"\n");
1072 // Compute the sources_to set, ignoring discarded nodes.
1073 for(i=0;i<qnodes.size();++i){
1074 if(discarded_nodes.count(i)==0)
1075 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1076 qnodes[(*si)]->sources_to.insert(i);
1081 // Find the nodes that are shared by multiple visible subtrees.
1082 // THe roots become inferred visible nodes.
1084 // Find the visible nodes.
1085 vector<int> visible_nodes;
1086 for(i=0;i<qnodes.size();i++){
1087 if(qnodes[i]->is_externally_visible){
1088 visible_nodes.push_back(i);
1092 // Find UDOPs referenced by visible nodes.
1094 for(i=0;i<visible_nodes.size();++i){
1095 workq.push_back(visible_nodes[i]);
1097 while(!workq.empty()){
1098 int node = workq.front();
1100 set<int>::iterator children;
1101 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1102 qnodes[node]->is_externally_visible = true;
1103 visible_nodes.push_back(node);
1104 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1105 if(qnodes[(*children)]->is_externally_visible == false){
1106 qnodes[(*children)]->is_externally_visible = true;
1107 visible_nodes.push_back((*children));
1111 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1112 workq.push_back((*children));
1119 for(i=0;i<qnodes.size();i++){
1120 qnodes[i]->subtree_roots.clear();
1123 // Walk the tree defined by a visible node, not descending into
1124 // subtrees rooted by a visible node. Mark the node visited with
1125 // the visible node ID.
1126 for(i=0;i<visible_nodes.size();++i){
1128 vroots.insert(visible_nodes[i]);
1129 while(vroots.size()>0){
1130 for(si=vroots.begin();si!=vroots.end();++si){
1131 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1133 set<int>::iterator sir;
1134 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1135 if(! qnodes[(*sir)]->is_externally_visible){
1136 candidates.insert( (*sir));
1140 vroots = candidates;
1144 // Find the nodes in multiple visible node subtrees, but with no parent
1145 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1146 done = true; // until proven otherwise
1147 for(i=0;i<qnodes.size();i++){
1148 if(qnodes[i]->subtree_roots.size()>1){
1149 bool is_new_root = true;
1150 set<int>::iterator sir;
1151 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1152 if(qnodes[(*sir)]->subtree_roots.size()>1)
1153 is_new_root = false;
1156 qnodes[i]->is_externally_visible = true;
1157 qnodes[i]->inferred_visible_node = true;
1158 visible_nodes.push_back(i);
1169 // get visible nodes in topo ordering.
1170 // for(i=0;i<qnodes.size();i++){
1171 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1173 vector<int> process_order;
1174 while(roots.size() >0){
1175 for(si=roots.begin();si!=roots.end();++si){
1176 if(discarded_nodes.count((*si))==0){
1177 process_order.push_back( (*si) );
1179 set<int>::iterator sir;
1180 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1181 qnodes[(*sir)]->n_consumers--;
1182 if(qnodes[(*sir)]->n_consumers == 0)
1183 candidates.insert( (*sir));
1191 //printf("process_order.size() =%d\n",process_order.size());
1193 // Search for cyclic dependencies
1195 for(i=0;i<qnodes.size();++i){
1196 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1197 if(found_dep.size() != 0) found_dep += ", ";
1198 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1201 if(found_dep.size()>0){
1202 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1206 // Get a list of query sets, in the order to be processed.
1207 // Start at visible root and do bfs.
1208 // The query set includes queries referenced indirectly,
1209 // as sources for user-defined operators. These are needed
1210 // to ensure that they are added to the schema, but are not part
1211 // of the query tree.
1213 // stream_node_sets contains queries reachable only through the
1214 // FROM clause, so I can tell which queries to add to the stream
1215 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1217 // NOTE: this code works because in order for data to be
1218 // read by multiple hftas, the node must be externally visible.
1219 // But visible nodes define roots of process sets.
1220 // internally visible nodes can feed data only
1221 // to other nodes in the same query file.
1222 // Therefore, any access can be restricted to a file,
1223 // hfta output sharing is done only on roots
1224 // never on interior nodes.
1229 // Conpute the base collection of hftas.
1230 vector<hfta_node *> hfta_sets;
1231 map<string, int> hfta_name_map;
1232 // vector< vector<int> > process_sets;
1233 // vector< set<int> > stream_node_sets;
1234 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1235 // i.e. process leaves 1st.
1236 for(i=0;i<process_order.size();++i){
1237 if(qnodes[process_order[i]]->is_externally_visible == true){
1238 //printf("Visible.\n");
1239 int root = process_order[i];
1240 hfta_node *hnode = new hfta_node();
1241 hnode->name = qnodes[root]-> name;
1242 hnode->source_name = qnodes[root]-> name;
1243 hnode->is_udop = qnodes[root]->is_udop;
1244 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1246 vector<int> proc_list; proc_list.push_back(root);
1247 // Ensure that nodes are added only once.
1248 set<int> proc_set; proc_set.insert(root);
1249 roots.clear(); roots.insert(root);
1251 while(roots.size()>0){
1252 for(si=roots.begin();si!=roots.end();++si){
1253 //printf("Processing root %d\n",(*si));
1254 set<int>::iterator sir;
1255 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1256 //printf("reads fom %d\n",(*sir));
1257 if(qnodes[(*sir)]->is_externally_visible==false){
1258 candidates.insert( (*sir) );
1259 if(proc_set.count( (*sir) )==0){
1260 proc_set.insert( (*sir) );
1261 proc_list.push_back( (*sir) );
1270 reverse(proc_list.begin(), proc_list.end());
1271 hnode->query_node_indices = proc_list;
1272 hfta_name_map[hnode->name] = hfta_sets.size();
1273 hfta_sets.push_back(hnode);
1277 // Compute the reads_from / sources_to graphs for the hftas.
1279 for(i=0;i<hfta_sets.size();++i){
1280 hfta_node *hnode = hfta_sets[i];
1281 for(q=0;q<hnode->query_node_indices.size();q++){
1282 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1283 for(s=0;s<qnode->refd_tbls.size();++s){
1284 if(hfta_name_map.count(qnode->refd_tbls[s])){
1285 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1286 hnode->reads_from.insert(other_hfta);
1287 hfta_sets[other_hfta]->sources_to.insert(i);
1293 // Compute a topological sort of the hfta_sets.
1295 vector<int> hfta_topsort;
1297 int hnode_srcs[hfta_sets.size()];
1298 for(i=0;i<hfta_sets.size();++i){
1300 if(hfta_sets[i]->sources_to.size() == 0)
1304 while(! workq.empty()){
1305 int node = workq.front();
1307 hfta_topsort.push_back(node);
1308 set<int>::iterator stsi;
1309 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1310 int parent = (*stsi);
1311 hnode_srcs[parent]++;
1312 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1313 workq.push_back(parent);
1318 // Decorate hfta nodes with the level of parallelism given as input.
1320 map<string, int>::iterator msii;
1321 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1322 string hfta_name = (*msii).first;
1323 int par = (*msii).second;
1324 if(hfta_name_map.count(hfta_name) > 0){
1325 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1327 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1331 // Propagate levels of parallelism: children should have a level of parallelism
1332 // as large as any of its parents. Adjust children upwards to compensate.
1333 // Start at parents and adjust children, auto-propagation will occur.
1335 for(i=hfta_sets.size()-1;i>=0;i--){
1336 set<int>::iterator stsi;
1337 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1338 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1339 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1344 // Before all the name mangling, check if therey are any output_spec.cfg
1345 // or hfta_parallelism.cfg entries that do not have a matching query.
1347 string dangling_ospecs = "";
1348 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1349 string oq = (*msii).first;
1350 if(hfta_name_map.count(oq) == 0){
1351 dangling_ospecs += " "+(*msii).first;
1354 if(dangling_ospecs!=""){
1355 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1359 string dangling_par = "";
1360 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1361 string oq = (*msii).first;
1362 if(hfta_name_map.count(oq) == 0){
1363 dangling_par += " "+(*msii).first;
1366 if(dangling_par!=""){
1367 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1372 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1373 // FROM clauses: retarget any name which is an internal node, and
1374 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1375 // when the source hfta has more parallelism than the target node.
1376 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1379 int n_original_hfta_sets = hfta_sets.size();
1380 for(i=0;i<n_original_hfta_sets;++i){
1381 if(hfta_sets[i]->n_parallel > 1){
1382 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1383 set<string> local_nodes; // names of query nodes in the hfta.
1384 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1385 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1388 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1389 string mangler = "__copy"+int_to_string(p);
1390 hfta_node *par_hfta = new hfta_node();
1391 par_hfta->name = hfta_sets[i]->name + mangler;
1392 par_hfta->source_name = hfta_sets[i]->name;
1393 par_hfta->is_udop = hfta_sets[i]->is_udop;
1394 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1395 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1396 par_hfta->parallel_idx = p;
1398 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1401 if(hfta_sets[i]->is_udop){
1402 int root = hfta_sets[i]->query_node_indices[0];
1404 string unequal_par_sources;
1405 set<int>::iterator rfsii;
1406 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1407 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1408 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1411 if(unequal_par_sources != ""){
1412 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1417 vector<string> new_sources;
1418 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1419 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1422 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1423 new_qn->name += mangler;
1424 new_qn->mangler = mangler;
1425 new_qn->refd_tbls = new_sources;
1426 par_hfta->query_node_indices.push_back(qnodes.size());
1427 par_qnode_map[new_qn->name] = qnodes.size();
1428 name_node_map[ new_qn->name ] = qnodes.size();
1429 qnodes.push_back(new_qn);
1431 // regular query node
1432 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1433 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1434 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1435 // rehome the from clause on mangled names.
1436 // create merge nodes as needed for external sources.
1437 for(f=0;f<dup_pt->fm->tlist.size();++f){
1438 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1439 dup_pt->fm->tlist[f]->schema_name += mangler;
1440 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1441 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1442 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1443 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1444 dup_pt->fm->tlist[f]->schema_name += mangler;
1446 vector<string> src_tbls;
1447 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1449 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1452 for(s=0;s<stride;++s){
1453 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1454 src_tbls.push_back(ext_src_name);
1456 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1457 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1458 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1459 // Make a qnode to represent the new merge node
1460 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1461 qn_pt->refd_tbls = src_tbls;
1462 qn_pt->is_udop = false;
1463 qn_pt->is_externally_visible = false;
1464 qn_pt->inferred_visible_node = false;
1465 par_hfta->query_node_indices.push_back(qnodes.size());
1466 par_qnode_map[merge_node_name] = qnodes.size();
1467 name_node_map[ merge_node_name ] = qnodes.size();
1468 qnodes.push_back(qn_pt);
1472 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1473 for(f=0;f<dup_pt->fm->tlist.size();++f){
1474 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1476 new_qn->params = qnodes[hqn_idx]->params;
1477 new_qn->is_udop = false;
1478 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1479 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1480 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1481 par_qnode_map[new_qn->name] = qnodes.size();
1482 name_node_map[ new_qn->name ] = qnodes.size();
1483 qnodes.push_back(new_qn);
1486 hfta_name_map[par_hfta->name] = hfta_sets.size();
1487 hfta_sets.push_back(par_hfta);
1490 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1492 if(!hfta_sets[i]->is_udop){
1493 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1494 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1495 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1496 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1497 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1498 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1499 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1500 vector<string> src_tbls;
1501 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1502 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1503 src_tbls.push_back(ext_src_name);
1505 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1506 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1507 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1508 // Make a qnode to represent the new merge node
1509 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1510 qn_pt->refd_tbls = src_tbls;
1511 qn_pt->is_udop = false;
1512 qn_pt->is_externally_visible = false;
1513 qn_pt->inferred_visible_node = false;
1514 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1515 name_node_map[ merge_node_name ] = qnodes.size();
1516 qnodes.push_back(qn_pt);
1525 // Rebuild the reads_from / sources_to lists in the qnodes
1526 for(q=0;q<qnodes.size();++q){
1527 qnodes[q]->reads_from.clear();
1528 qnodes[q]->sources_to.clear();
1530 for(q=0;q<qnodes.size();++q){
1531 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1532 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1533 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1534 qnodes[q]->reads_from.insert(rf);
1535 qnodes[rf]->sources_to.insert(q);
1540 // Rebuild the reads_from / sources_to lists in hfta_sets
1541 for(q=0;q<hfta_sets.size();++q){
1542 hfta_sets[q]->reads_from.clear();
1543 hfta_sets[q]->sources_to.clear();
1545 for(q=0;q<hfta_sets.size();++q){
1546 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1547 int node = hfta_sets[q]->query_node_indices[s];
1548 set<int>::iterator rfsii;
1549 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1550 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1551 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1552 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1559 for(q=0;q<qnodes.size();++q){
1560 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1561 set<int>::iterator rsii;
1562 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1563 printf(" %d",(*rsii));
1564 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1565 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1566 printf(" %d",(*rsii));
1570 for(q=0;q<hfta_sets.size();++q){
1571 if(hfta_sets[q]->do_generation==false)
1573 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1574 set<int>::iterator rsii;
1575 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1576 printf(" %d",(*rsii));
1577 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1578 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1579 printf(" %d",(*rsii));
1586 // Re-topo sort the hftas
1587 hfta_topsort.clear();
1589 int hnode_srcs_2[hfta_sets.size()];
1590 for(i=0;i<hfta_sets.size();++i){
1591 hnode_srcs_2[i] = 0;
1592 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1597 while(workq.empty() == false){
1598 int node = workq.front();
1600 hfta_topsort.push_back(node);
1601 set<int>::iterator stsii;
1602 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1603 int child = (*stsii);
1604 hnode_srcs_2[child]++;
1605 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1606 workq.push_back(child);
1611 // Ensure that all of the query_node_indices in hfta_sets are topologically
1612 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1613 for(i=0;i<hfta_sets.size();++i){
1614 if(hfta_sets[i]->do_generation){
1615 map<int,int> n_accounted;
1616 vector<int> new_order;
1618 vector<int>::iterator vii;
1619 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620 n_accounted[(*vii)]= 0;
1622 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1623 set<int>::iterator rfsii;
1624 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1625 if(n_accounted.count((*rfsii)) == 0){
1626 n_accounted[(*vii)]++;
1629 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1630 workq.push_back((*vii));
1634 while(workq.empty() == false){
1635 int node = workq.front();
1637 new_order.push_back(node);
1638 set<int>::iterator stsii;
1639 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1640 if(n_accounted.count((*stsii))){
1641 n_accounted[(*stsii)]++;
1642 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1643 workq.push_back((*stsii));
1648 hfta_sets[i]->query_node_indices = new_order;
1656 /// Global checkng is done, start the analysis and translation
1657 /// of the query parse tree in the order specified by process_order
1660 // Get a list of the LFTAs for global lfta optimization
1661 // TODO: separate building operators from spliting lftas,
1662 // that will make optimizations such as predicate pushing easier.
1663 vector<stream_query *> lfta_list;
1664 stream_query *rootq;
1667 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1669 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1671 int hfta_id = hfta_topsort[qi];
1672 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1676 // Two possibilities, either its a UDOP, or its a collection of queries.
1677 // if(qnodes[curr_list.back()]->is_udop)
1678 if(hfta_sets[hfta_id]->is_udop){
1679 int node_id = curr_list.back();
1680 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1681 opview_entry *opv = new opview_entry();
1683 // Many of the UDOP properties aren't currently used.
1684 opv->parent_qname = "no_parent";
1685 opv->root_name = qnodes[node_id]->name;
1686 opv->view_name = qnodes[node_id]->file;
1688 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1689 opv->udop_alias = tmpstr;
1690 opv->mangler = qnodes[node_id]->mangler;
1692 if(opv->mangler != ""){
1693 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1694 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1697 // This piece of code makes each hfta which referes to the same udop
1698 // reference a distinct running udop. Do this at query optimization time?
1699 // fmtbl->set_udop_alias(opv->udop_alias);
1701 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1702 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1704 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1706 for(s=0;s<subq.size();++s){
1707 // Validate that the fields match.
1708 subquery_spec *sqs = subq[s];
1709 string subq_name = sqs->name + opv->mangler;
1710 vector<field_entry *> flds = Schema->get_fields(subq_name);
1711 if(flds.size() == 0){
1712 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1715 if(flds.size() < sqs->types.size()){
1716 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1719 bool failed = false;
1720 for(f=0;f<sqs->types.size();++f){
1721 data_type dte(sqs->types[f],sqs->modifiers[f]);
1722 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1723 if(! dte.subsumes_type(&dtf) ){
1724 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1728 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1729 string pstr = dte.get_temporal_string();
1730 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1737 /// Validation done, find the subquery, make a copy of the
1738 /// parse tree, and add it to the return list.
1739 for(q=0;q<qnodes.size();++q)
1740 if(qnodes[q]->name == subq_name)
1742 if(q==qnodes.size()){
1743 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1749 // Cross-link to from entry(s) in all sourced-to tables.
1750 set<int>::iterator sii;
1751 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1752 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1753 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1755 for(ii=0;ii<tblvars.size();++ii){
1756 if(tblvars[ii]->schema_name == opv->root_name){
1757 tblvars[ii]->set_opview_idx(opviews.size());
1763 opviews.append(opv);
1766 // Analyze the parse trees in this query,
1767 // put them in rootq
1768 // vector<int> curr_list = process_sets[qi];
1771 ////////////////////////////////////////
1774 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1775 for(qj=0;qj<curr_list.size();++qj){
1777 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1779 // Select the current query parse tree
1780 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1782 // if hfta only, try to fetch any missing schemas
1783 // from the registry (using the print_schema program).
1784 // Here I use a hack to avoid analyzing the query -- all referenced
1785 // tables must be in the from clause
1786 // If there is a problem loading any table, just issue a warning,
1788 tablevar_list_t *fm = fta_parse_tree->get_from();
1789 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1790 // iterate over all referenced tables
1792 for(t=0;t<refd_tbls.size();++t){
1793 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1795 if(tbl_ref < 0){ // if this table is not in the Schema
1798 string cmd="print_schema "+refd_tbls[t];
1799 FILE *schema_in = popen(cmd.c_str(), "r");
1800 if(schema_in == NULL){
1801 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1803 string schema_instr;
1804 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1805 schema_instr += tmpstr;
1807 fta_parse_result = new fta_parse_t();
1808 strcpy(tmp_schema_str,schema_instr.c_str());
1809 FtaParser_setstringinput(tmp_schema_str);
1810 if(FtaParserparse()){
1811 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1813 if( fta_parse_result->tables != NULL){
1815 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1816 Schema->add_table(fta_parse_result->tables->get_table(tl));
1819 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1824 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1832 // Analyze the query.
1833 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1835 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1839 stream_query new_sq(qs, Schema);
1840 if(new_sq.error_code){
1841 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1845 // Add it to the Schema
1846 table_def *output_td = new_sq.get_output_tabledef();
1847 Schema->add_table(output_td);
1849 // Create a query plan from the analyzed parse tree.
1850 // If its a query referneced via FROM, add it to the stream query.
1852 rootq->add_query(new_sq);
1854 rootq = new stream_query(new_sq);
1855 // have the stream query object inherit properties form the analyzed
1856 // hfta_node object.
1857 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1858 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1864 // This stream query has all its parts
1865 // Build and optimize it.
1866 //printf("translate_fta: generating plan.\n");
1867 if(rootq->generate_plan(Schema)){
1868 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1872 // If we've found the query plan head, so now add the output operators
1873 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1874 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1875 multimap<string, int>::iterator mmsi;
1876 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1877 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1878 rootq->add_output_operator(output_specs[(*mmsi).second]);
1884 // Perform query splitting if necessary.
1886 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1889 //for(l=0;l<split_queries.size();++l){
1890 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1896 if(split_queries.size() > 0){ // should be at least one component.
1898 // Compute the number of LFTAs.
1899 int n_lfta = split_queries.size();
1900 if(hfta_returned) n_lfta--;
1901 // Check if a schemaId constraint needs to be inserted.
1903 // Process the LFTA components.
1904 for(l=0;l<n_lfta;++l){
1905 if(lfta_names.count(split_queries[l]->query_name) == 0){
1906 // Grab the lfta for global optimization.
1907 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1908 string liface = "_local_";
1909 // string lmach = "";
1910 string lmach = hostname;
1912 liface = tvec[0]->get_interface(); // iface queries have been resolved
1913 if(tvec[0]->get_machine() != ""){
1914 lmach = tvec[0]->get_machine();
1916 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1919 interface_names.push_back(liface);
1920 machine_names.push_back(lmach);
1923 vector<predicate_t *> schemaid_preds;
1924 for(int irv=0;irv<tvec.size();++irv){
1926 string schema_name = tvec[irv]->get_schema_name();
1927 string rvar_name = tvec[irv]->get_var_name();
1928 int schema_ref = tvec[irv]->get_schema_ref();
1931 // interface_names.push_back(liface);
1932 // machine_names.push_back(lmach);
1934 //printf("Machine is %s\n",lmach.c_str());
1936 // Check if a schemaId constraint needs to be inserted.
1937 if(schema_ref<0){ // can result from some kinds of splits
1938 schema_ref = Schema->get_table_ref(schema_name);
1940 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1943 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1945 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1950 if(tvec[irv]->get_interface() != "_local_"){
1951 if(iface->has_multiple_schemas()){
1952 if(schema_id<0){ // invalid schema_id
1953 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1956 vector<string> iface_schemas = iface->get_property("Schemas");
1957 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1958 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1961 // Ensure that in liface, schema_id is used for only one schema
1962 if(schema_of_schemaid.count(liface)==0){
1963 map<int, string> empty_map;
1964 schema_of_schemaid[liface] = empty_map;
1966 if(schema_of_schemaid[liface].count(schema_id)==0){
1967 schema_of_schemaid[liface][schema_id] = schema_name;
1969 if(schema_of_schemaid[liface][schema_id] != schema_name){
1970 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1974 }else{ // single-schema interface
1975 schema_id = -1; // don't generate schema_id predicate
1976 vector<string> iface_schemas = iface->get_property("Schemas");
1977 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1978 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1981 if(iface_schemas.size()>1){
1982 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1990 // If we need to check the schema_id, insert a predicate into the lfta.
1991 // TODO not just schema_id, the full all_schema_ids set.
1993 colref_t *schid_cr = new colref_t("schemaId");
1994 schid_cr->schema_ref = schema_ref;
1995 schid_cr->table_name = rvar_name;
1996 schid_cr->tablevar_ref = 0;
1997 schid_cr->default_table = false;
1998 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1999 data_type *schid_dt = new data_type("uint");
2000 schid_se->dt = schid_dt;
2002 string schid_str = int_to_string(schema_id);
2003 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2004 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2005 lit_se->dt = schid_dt;
2007 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2008 vector<cnf_elem *> clist;
2009 make_cnf_from_pr(schid_pr, clist);
2010 analyze_cnf(clist[0]);
2011 clist[0]->cost = 1; // cheap one comparison
2012 // cnf built, now insert it.
2013 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2015 // Specialized processing
2016 // filter join, get two schemaid preds
2017 string node_type = split_queries[l]->query_plan[0]->node_type();
2018 if(node_type == "filter_join"){
2019 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2021 fj->pred_t0.push_back(clist[0]);
2023 fj->pred_t1.push_back(clist[0]);
2025 schemaid_preds.push_back(schid_pr);
2027 // watchlist join, get the first schemaid pred
2028 if(node_type == "watch_join"){
2029 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2031 fj->pred_t0.push_back(clist[0]);
2032 schemaid_preds.push_back(schid_pr);
2037 // Specialized processing, currently filter join.
2038 if(schemaid_preds.size()>1){
2039 string node_type = split_queries[l]->query_plan[0]->node_type();
2040 if(node_type == "filter_join"){
2041 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2042 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2043 vector<cnf_elem *> clist;
2044 make_cnf_from_pr(filter_pr, clist);
2045 analyze_cnf(clist[0]);
2046 clist[0]->cost = 1; // cheap one comparison
2047 fj->shared_pred.push_back(clist[0]);
2057 // Set the ht size from the recommendation, if there is one in the rec file
2058 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2059 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2063 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2064 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2065 lfta_list.push_back(split_queries[l]);
2066 lfta_mach_lists[lmach].push_back(split_queries[l]);
2068 // THe following is a hack,
2069 // as I should be generating LFTA code through
2070 // the stream_query object.
2072 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2074 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2077 // Create query description to embed in lfta.c
2078 string lfta_schema_str = split_queries[l]->make_schema();
2079 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2081 // get NIC capabilities.
2083 nic_property *nicprop = NULL;
2084 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2085 if(iface_codegen_type.size()){
2086 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2088 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2093 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2096 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2098 // TODO NOTE : I'd like it to be the case that registration_query_names
2099 // are the queries to be registered for subsciption.
2100 // but there is some complex bookkeeping here.
2101 registration_query_names.push_back(split_queries[l]->query_name);
2102 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2103 // NOTE: I will assume a 1-1 correspondance between
2104 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2105 // where mach_query_names[lmach][i] contains the index into
2106 // query_names, which names the lfta, and
2107 // mach_query_names[lmach][i] is the stream_query * of the
2108 // corresponding lfta.
2109 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2113 // check if lfta is reusable
2114 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2116 bool lfta_reusable = false;
2117 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2118 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2119 lfta_reusable = true;
2121 lfta_reuse_options.push_back(lfta_reusable);
2123 // LFTA will inherit the liveness timeout specification from the containing query
2124 // it is too conservative as lfta are expected to spend less time per tuple
2127 // extract liveness timeout from query definition
2128 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2129 if (!liveness_timeout) {
2130 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2131 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2132 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2134 lfta_liveness_timeouts.push_back(liveness_timeout);
2136 // Add it to the schema
2137 table_def *td = split_queries[l]->get_output_tabledef();
2138 Schema->append_table(td);
2139 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2144 // If the output is lfta-only, dump out the query name.
2145 if(split_queries.size() == 1 && !hfta_returned){
2146 if(output_query_names ){
2147 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2151 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2156 // output schema summary
2157 if(output_schema_summary){
2158 dump_summary(split_queries[0]);
2164 if(hfta_returned){ // query also has an HFTA component
2165 int hfta_nbr = split_queries.size()-1;
2167 hfta_list.push_back(split_queries[hfta_nbr]);
2169 // report on generated query names
2170 if(output_query_names){
2171 string hfta_name =split_queries[hfta_nbr]->query_name;
2172 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2173 for(l=0;l<hfta_nbr;++l){
2174 string lfta_name =split_queries[l]->query_name;
2175 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2179 // fprintf(stderr,"query names are ");
2180 // for(l=0;l<hfta_nbr;++l){
2181 // if(l>0) fprintf(stderr,",");
2182 // string fta_name =split_queries[l]->query_name;
2183 // fprintf(stderr," %s",fta_name.c_str());
2185 // fprintf(stderr,"\n");
2190 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2191 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2198 //-----------------------------------------------------------------
2199 // Compute and propagate the SE in PROTOCOL fields compute a field.
2200 //-----------------------------------------------------------------
2202 for(i=0;i<lfta_list.size();i++){
2203 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2204 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2206 for(i=0;i<hfta_list.size();i++){
2207 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2208 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2213 //------------------------------------------------------------------------
2214 // Perform individual FTA optimizations
2215 //-----------------------------------------------------------------------
2217 if (partitioned_mode) {
2219 // open partition definition file
2220 string part_fname = config_dir_path + "partition.txt";
2222 FILE* partfd = fopen(part_fname.c_str(), "r");
2224 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2227 PartnParser_setfileinput(partfd);
2228 if (PartnParserparse()) {
2229 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2236 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2238 int num_hfta = hfta_list.size();
2239 for(i=0; i < hfta_list.size(); ++i){
2240 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2243 // Add all new hftas to schema
2244 for(i=num_hfta; i < hfta_list.size(); ++i){
2245 table_def *td = hfta_list[i]->get_output_tabledef();
2246 Schema->append_table(td);
2249 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2253 //------------------------------------------------------------------------
2254 // Do global (cross-fta) optimization
2255 //-----------------------------------------------------------------------
2262 set<string> extra_external_libs;
2264 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2267 // build hfta file name, create output
2268 if(numeric_hfta_flname){
2269 sprintf(tmpstr,"hfta_%d",hfta_count);
2270 hfta_names.push_back(tmpstr);
2271 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2273 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2274 hfta_names.push_back(tmpstr);
2275 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2277 FILE *hfta_fl = fopen(tmpstr,"w");
2278 if(hfta_fl == NULL){
2279 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2282 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2284 // If there is a field verifier, warn about
2285 // lack of compatability
2286 // NOTE : this code assumes that visible non-lfta queries
2287 // are those at the root of a stream query.
2288 string hfta_comment;
2290 string hfta_namespace;
2291 if(hfta_list[i]->defines.count("comment")>0)
2292 hfta_comment = hfta_list[i]->defines["comment"];
2293 if(hfta_list[i]->defines.count("Comment")>0)
2294 hfta_comment = hfta_list[i]->defines["Comment"];
2295 if(hfta_list[i]->defines.count("COMMENT")>0)
2296 hfta_comment = hfta_list[i]->defines["COMMENT"];
2297 if(hfta_list[i]->defines.count("title")>0)
2298 hfta_title = hfta_list[i]->defines["title"];
2299 if(hfta_list[i]->defines.count("Title")>0)
2300 hfta_title = hfta_list[i]->defines["Title"];
2301 if(hfta_list[i]->defines.count("TITLE")>0)
2302 hfta_title = hfta_list[i]->defines["TITLE"];
2303 if(hfta_list[i]->defines.count("namespace")>0)
2304 hfta_namespace = hfta_list[i]->defines["namespace"];
2305 if(hfta_list[i]->defines.count("Namespace")>0)
2306 hfta_namespace = hfta_list[i]->defines["Namespace"];
2307 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2308 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2310 if(field_verifier != NULL){
2312 if(hfta_comment == "")
2313 warning_str += "\tcomment not found.\n";
2315 // Obsolete stuff that Carsten wanted
2316 // if(hfta_title == "")
2317 // warning_str += "\ttitle not found.\n";
2318 // if(hfta_namespace == "")
2319 // warning_str += "\tnamespace not found.\n";
2321 // There is a get_tbl_keys method implemented for qp_nodes,
2322 // integrate it into steam_query, then call it to find keys,
2323 // and annotate feidls with their key-ness.
2324 // If there is a "keys" proprty in the defines block, override anything returned
2325 // from the automated analysis
2327 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2329 for(fi=0;fi<flds.size();fi++){
2330 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2332 if(warning_str != "")
2333 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2334 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2337 // Get the fields in this query
2338 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2340 // do key processing
2341 string hfta_keys_s = "";
2342 if(hfta_list[i]->defines.count("keys")>0)
2343 hfta_keys_s = hfta_list[i]->defines["keys"];
2344 if(hfta_list[i]->defines.count("Keys")>0)
2345 hfta_keys_s = hfta_list[i]->defines["Keys"];
2346 if(hfta_list[i]->defines.count("KEYS")>0)
2347 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2348 string xtra_keys_s = "";
2349 if(hfta_list[i]->defines.count("extra_keys")>0)
2350 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2351 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2352 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2353 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2354 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2356 vector<string> hfta_keys;
2357 vector<string> partial_keys;
2358 vector<string> xtra_keys;
2359 if(hfta_keys_s==""){
2360 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2361 if(xtra_keys_s.size()>0){
2362 xtra_keys = split_string(xtra_keys_s, ',');
2364 for(int xi=0;xi<xtra_keys.size();++xi){
2365 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2366 hfta_keys.push_back(xtra_keys[xi]);
2370 hfta_keys = split_string(hfta_keys_s, ',');
2372 // validate that all of the keys exist in the output.
2373 // (exit on error, as its a bad specificiation)
2374 vector<string> missing_keys;
2375 for(int ki=0;ki<hfta_keys.size(); ++ki){
2377 for(fi=0;fi<flds.size();++fi){
2378 if(hfta_keys[ki] == flds[fi]->get_name())
2382 missing_keys.push_back(hfta_keys[ki]);
2384 if(missing_keys.size()>0){
2385 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2386 for(int hi=0; hi<missing_keys.size(); ++hi){
2387 fprintf(stderr," %s", missing_keys[hi].c_str());
2389 fprintf(stderr,"\n");
2393 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2394 if(hfta_comment != "")
2395 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2396 if(hfta_title != "")
2397 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2398 if(hfta_namespace != "")
2399 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2400 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2401 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2403 // write info about fields to qtree.xml
2405 for(fi=0;fi<flds.size();fi++){
2406 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2407 if(flds[fi]->get_modifier_list()->size()){
2408 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2410 fprintf(qtree_output," />\n");
2413 for(int hi=0;hi<hfta_keys.size();++hi){
2414 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2416 for(int hi=0;hi<partial_keys.size();++hi){
2417 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2419 for(int hi=0;hi<xtra_keys.size();++hi){
2420 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2424 // extract liveness timeout from query definition
2425 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2426 if (!liveness_timeout) {
2427 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2428 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2429 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2431 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2433 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2435 for(itv=0;itv<tmp_tv.size();++itv){
2436 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2438 string ifrs = hfta_list[i]->collect_refd_ifaces();
2440 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2442 fprintf(qtree_output,"\t</HFTA>\n");
2446 // debug only -- do code generation to catch generation-time errors.
2447 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2450 hfta_count++; // for hfta file names with numeric suffixes
2452 hfta_list[i]->get_external_libs(extra_external_libs);
2456 string ext_lib_string;
2457 set<string>::iterator ssi_el;
2458 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2459 ext_lib_string += (*ssi_el)+" ";
2463 // Report on the set of operator views
2464 for(i=0;i<opviews.size();++i){
2465 opview_entry *opve = opviews.get_entry(i);
2466 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2467 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2468 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2469 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2470 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2472 if (!opve->liveness_timeout) {
2473 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2474 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2475 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2477 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2479 for(j=0;j<opve->subq_names.size();j++)
2480 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2481 fprintf(qtree_output,"\t</UDOP>\n");
2485 //-----------------------------------------------------------------
2487 // Create interface-specific meta code files.
2488 // first, open and parse the interface resources file.
2489 ifaces_db = new ifq_t();
2491 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2492 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2493 ifx_fname.c_str(), ierr.c_str());
2497 map<string, vector<stream_query *> >::iterator svsi;
2498 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2499 string lmach = (*svsi).first;
2501 // For this machine, create a set of lftas per interface.
2502 vector<stream_query *> mach_lftas = (*svsi).second;
2503 map<string, vector<stream_query *> > lfta_iface_lists;
2505 for(li=0;li<mach_lftas.size();++li){
2506 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2507 string lfta_iface = "_local_";
2509 string lfta_iface = tvec[0]->get_interface();
2511 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2514 map<string, vector<stream_query *> >::iterator lsvsi;
2515 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2517 string liface = (*lsvsi).first;
2518 vector<stream_query *> iface_lftas = (*lsvsi).second;
2519 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2520 if(iface_codegen_type.size()){
2521 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2523 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2526 string mcs = generate_nic_code(iface_lftas, nicprop);
2529 mcf_flnm = lmach + "_"+liface+".mcf";
2531 mcf_flnm = hostname + "_"+liface+".mcf";
2533 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2534 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2537 fprintf(mcf_fl,"%s",mcs.c_str());
2539 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2540 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2549 //-----------------------------------------------------------------
2552 // Find common filter predicates in the LFTAs.
2553 // in addition generate structs to store the temporal attributes unpacked by prefilter
2555 map<string, vector<stream_query *> >::iterator ssqi;
2556 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2558 string lmach = (*ssqi).first;
2559 bool packed_return = false;
2563 // The LFTAs of this machine.
2564 vector<stream_query *> mach_lftas = (*ssqi).second;
2565 // break up on a per-interface basis.
2566 map<string, vector<stream_query *> > lfta_iface_lists;
2567 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2569 for(li=0;li<mach_lftas.size();++li){
2570 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2571 string lfta_iface = "_local_";
2573 lfta_iface = tvec[0]->get_interface();
2575 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2576 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2580 // Are the return values "packed"?
2581 // This should be done on a per-interface basis.
2582 // But this is defunct code for gs-lite
2583 for(li=0;li<mach_lftas.size();++li){
2584 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2585 string liface = "_local_";
2587 liface = tvec[0]->get_interface();
2589 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2590 if(iface_codegen_type.size()){
2591 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2592 packed_return = true;
2598 // Separate lftas by interface, collect results on a per-interface basis.
2600 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2601 map<string, vector<cnf_set *> > prefilter_preds;
2602 set<unsigned int> pred_ids; // this can be global for all interfaces
2603 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2604 string liface = (*mvsi).first;
2605 vector<cnf_set *> empty_list;
2606 prefilter_preds[liface] = empty_list;
2607 if(! packed_return){
2608 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2611 // get NIC capabilities. (Is this needed?)
2612 nic_property *nicprop = NULL;
2613 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2614 if(iface_codegen_type.size()){
2615 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2617 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2624 // Now that we know the prefilter preds, generate the lfta code.
2625 // Do this for all lftas in this machine.
2626 for(li=0;li<mach_lftas.size();++li){
2627 set<unsigned int> subsumed_preds;
2628 set<unsigned int>::iterator sii;
2630 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2632 if((pid>>16) == li){
2633 subsumed_preds.insert(pid & 0xffff);
2637 string lfta_schema_str = mach_lftas[li]->make_schema();
2638 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2639 nic_property *nicprop = NULL; // no NIC properties?
2640 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2644 // generate structs to store the temporal attributes
2645 // unpacked by prefilter
2646 col_id_set temp_cids;
2647 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2648 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2650 // Compute the lfta bit signatures and the lfta colrefs
2651 // do this on a per-interface basis
2653 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2655 map<string, vector<long long int> > lfta_sigs; // used again later
2656 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2657 string liface = (*mvsi).first;
2658 vector<long long int> empty_list;
2659 lfta_sigs[liface] = empty_list;
2661 vector<col_id_set> lfta_cols;
2662 vector<int> lfta_snap_length;
2663 for(li=0;li<lfta_iface_lists[liface].size();++li){
2664 unsigned long long int mask=0, bpos=1;
2666 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2667 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2671 lfta_sigs[liface].push_back(mask);
2672 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2673 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2676 //for(li=0;li<mach_lftas.size();++li){
2677 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2678 //col_id_set::iterator tcisi;
2679 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2680 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2685 // generate the prefilter
2686 // Do this on a per-interface basis, except for the #define
2688 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2689 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2691 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2696 // Generate interface parameter lookup function
2697 lfta_val[lmach] += "// lookup interface properties by name\n";
2698 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2699 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2700 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2702 // collect a lit of interface names used by queries running on this host
2703 set<std::string> iface_names;
2704 for(i=0;i<mach_query_names[lmach].size();i++){
2705 int mi = mach_query_names[lmach][i];
2706 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2708 if(interface_names[mi]=="")
2709 iface_names.insert("DEFAULTDEV");
2711 iface_names.insert(interface_names[mi]);
2714 // generate interface property lookup code for every interface
2715 set<std::string>::iterator sir;
2716 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2717 if (sir == iface_names.begin())
2718 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2720 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2722 // iterate through interface properties
2723 vector<string> iface_properties;
2724 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2725 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2728 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2731 if (iface_properties.empty())
2732 lfta_val[lmach] += "\t\treturn NULL;\n";
2734 for (int i = 0; i < iface_properties.size(); ++i) {
2736 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2738 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2740 // combine all values for the interface property using comma separator
2741 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2742 lfta_val[lmach] += "\t\t\treturn \"";
2743 for (int j = 0; j < vals.size(); ++j) {
2744 lfta_val[lmach] += vals[j];
2745 if (j != vals.size()-1)
2746 lfta_val[lmach] += ",";
2748 lfta_val[lmach] += "\";\n";
2750 lfta_val[lmach] += "\t\t} else\n";
2751 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2754 lfta_val[lmach] += "\t} else\n";
2755 lfta_val[lmach] += "\t\treturn NULL;\n";
2756 lfta_val[lmach] += "}\n\n";
2759 // Generate a full list of FTAs for clearinghouse reference
2760 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2761 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2764 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2765 string liface = (*mvsi).first;
2766 if(liface != "_local_"){ // these don't register themselves
2767 vector<stream_query *> lfta_list = (*mvsi).second;
2768 for(i=0;i<lfta_list.size();i++){
2769 int mi = lfta_iface_qname_ix[liface][i];
2770 if(first) first = false;
2771 else lfta_val[lmach] += ", ";
2772 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2776 // for (i = 0; i < registration_query_names.size(); ++i) {
2778 // lfta_val[lmach] += ", ";
2779 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2782 for (i = 0; i < hfta_list.size(); ++i) {
2783 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2785 lfta_val[lmach] += ", NULL};\n\n";
2788 // Add the initialization function to lfta.c
2789 // Change to accept the interface name, and
2790 // set the prefilter function accordingly.
2791 // see the example in demo/err2
2792 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2793 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2795 // for(i=0;i<mach_query_names[lmach].size();i++)
2796 // int mi = mach_query_names[lmach][i];
2797 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2799 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2800 string liface = (*mvsi).first;
2801 vector<stream_query *> lfta_list = (*mvsi).second;
2802 for(i=0;i<lfta_list.size();i++){
2803 stream_query *lfta_sq = lfta_list[i];
2804 int mi = lfta_iface_qname_ix[liface][i];
2806 if(liface == "_local_"){
2807 // Don't register an init function, do the init code inline
2808 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2809 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2813 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2815 string this_iface = "DEFAULTDEV";
2816 if(interface_names[mi]!="")
2817 this_iface = '"'+interface_names[mi]+'"';
2818 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2819 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2820 // if(interface_names[mi]=="")
2821 // lfta_val[lmach]+="DEFAULTDEV";
2823 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2824 lfta_val[lmach] += this_iface;
2827 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2828 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2830 sprintf(tmpstr,",%d",snap_lengths[mi]);
2831 lfta_val[lmach] += tmpstr;
2833 // unsigned long long int mask=0, bpos=1;
2835 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2836 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2838 // bpos = bpos << 1;
2842 // sprintf(tmpstr,",%lluull",mask);
2843 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2844 lfta_val[lmach]+=tmpstr;
2846 lfta_val[lmach] += ",0ull";
2849 lfta_val[lmach] += ");\n";
2853 // End of lfta prefilter stuff
2854 // --------------------------------------------------
2856 // If there is a field verifier, warn about
2857 // lack of compatability
2858 string lfta_comment;
2860 string lfta_namespace;
2861 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2862 if(ldefs.count("comment")>0)
2863 lfta_comment = lfta_sq->defines["comment"];
2864 if(ldefs.count("Comment")>0)
2865 lfta_comment = lfta_sq->defines["Comment"];
2866 if(ldefs.count("COMMENT")>0)
2867 lfta_comment = lfta_sq->defines["COMMENT"];
2868 if(ldefs.count("title")>0)
2869 lfta_title = lfta_sq->defines["title"];
2870 if(ldefs.count("Title")>0)
2871 lfta_title = lfta_sq->defines["Title"];
2872 if(ldefs.count("TITLE")>0)
2873 lfta_title = lfta_sq->defines["TITLE"];
2874 if(ldefs.count("NAMESPACE")>0)
2875 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2876 if(ldefs.count("Namespace")>0)
2877 lfta_namespace = lfta_sq->defines["Namespace"];
2878 if(ldefs.count("namespace")>0)
2879 lfta_namespace = lfta_sq->defines["namespace"];
2881 string lfta_ht_size;
2882 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2883 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2884 if(ldefs.count("aggregate_slots")>0){
2885 lfta_ht_size = ldefs["aggregate_slots"];
2888 // NOTE : I'm assuming that visible lftas do not start with _fta.
2889 // -- will fail for non-visible simple selection queries.
2890 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2892 if(lfta_comment == "")
2893 warning_str += "\tcomment not found.\n";
2894 // Obsolete stuff that carsten wanted
2895 // if(lfta_title == "")
2896 // warning_str += "\ttitle not found.\n";
2897 // if(lfta_namespace == "")
2898 // warning_str += "\tnamespace not found.\n";
2900 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2902 for(fi=0;fi<flds.size();fi++){
2903 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2905 if(warning_str != "")
2906 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2907 registration_query_names[mi].c_str(),warning_str.c_str());
2911 // Create qtree output
2912 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2913 if(lfta_comment != "")
2914 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2915 if(lfta_title != "")
2916 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2917 if(lfta_namespace != "")
2918 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2919 if(lfta_ht_size != "")
2920 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2922 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2924 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2925 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2926 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2927 for(int t=0;t<itbls.size();++t){
2928 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2930 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2931 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2932 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2933 // write info about fields to qtree.xml
2934 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2936 for(fi=0;fi<flds.size();fi++){
2937 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2938 if(flds[fi]->get_modifier_list()->size()){
2939 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2941 fprintf(qtree_output," />\n");
2943 fprintf(qtree_output,"\t</LFTA>\n");
2949 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2950 string liface = (*mvsi).first;
2952 " if (!strcmp(device, \""+liface+"\")) \n"
2953 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2957 " if(lfta_prefilter == NULL){\n"
2958 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2965 lfta_val[lmach] += "}\n\n";
2967 if(!(debug_only || hfta_only) ){
2970 lfta_flnm = lmach + "_lfta.c";
2972 lfta_flnm = hostname + "_lfta.c";
2973 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2974 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2977 fprintf(lfta_out,"%s",lfta_header.c_str());
2978 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2979 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2984 // Say what are the operators which must execute
2985 if(opviews.size()>0)
2986 fprintf(stderr,"The queries use the following external operators:\n");
2987 for(i=0;i<opviews.size();++i){
2988 opview_entry *opv = opviews.get_entry(i);
2989 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2993 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2994 machine_names, schema_file_name,
2996 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2999 fprintf(qtree_output,"</QueryNodes>\n");
3004 ////////////////////////////////////////////////////////////
3006 void generate_makefile(vector<string> &input_file_names, int nfiles,
3007 vector<string> &hfta_names, opview_set &opviews,
3008 vector<string> &machine_names,
3009 string schema_file_name,
3010 vector<string> &interface_names,
3011 ifq_t *ifdb, string &config_dir_path,
3014 map<string, vector<int> > &rts_hload
3018 if(config_dir_path != ""){
3019 config_dir_path = "-C "+config_dir_path;
3023 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3024 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3026 // if(libz_exists && !libast_exists){
3027 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3031 // Get set of operator executable files to run
3033 set<string>::iterator ssi;
3034 for(i=0;i<opviews.size();++i){
3035 opview_entry *opv = opviews.get_entry(i);
3036 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3039 FILE *outfl = fopen("Makefile", "w");
3041 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3046 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3047 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3051 fprintf(outfl," -DLFTA_STATS");
3053 // Gather the set of interfaces
3054 // Also, gather "base interface names" for use in computing
3055 // the hash splitting to virtual interfaces.
3056 // TODO : must update to hanndle machines
3058 set<string> base_vifaces; // base interfaces of virtual interfaces
3059 map<string, string> ifmachines;
3060 map<string, string> ifattrs;
3061 for(i=0;i<interface_names.size();++i){
3062 ifaces.insert(interface_names[i]);
3063 ifmachines[interface_names[i]] = machine_names[i];
3065 size_t Xpos = interface_names[i].find_last_of("X");
3066 if(Xpos!=string::npos){
3067 string iface = interface_names[i].substr(0,Xpos);
3068 base_vifaces.insert(iface);
3070 // get interface attributes and add them to the list
3073 // Do we need to include protobuf libraries?
3074 // TODO Move to the interface library: get the libraries to include
3075 // for an interface type
3077 bool use_proto = false;
3078 bool use_bsa = false;
3079 bool use_kafka = false;
3082 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3083 string ifnm = (*ssi);
3084 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3085 for(int ift_i=0;ift_i<ift.size();ift_i++){
3086 if(ift[ift_i]=="PROTO"){
3087 #ifdef PROTO_ENABLED
3090 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3095 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3096 for(int ift_i=0;ift_i<ift.size();ift_i++){
3097 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3101 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3106 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3107 for(int ift_i=0;ift_i<ift.size();ift_i++){
3108 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3109 #ifdef KAFKA_ENABLED
3112 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3123 for(i=0;i<hfta_names.size();++i)
3124 fprintf(outfl," %s",hfta_names[i].c_str());
3128 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3129 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3131 fprintf(outfl,"-L. ");
3133 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3135 fprintf(outfl,"-lgscppads -lpads ");
3137 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3139 fprintf(outfl, " -lpz -lz -lbz ");
3140 if(libz_exists && libast_exists)
3141 fprintf(outfl," -last ");
3143 fprintf(outfl, " -ldll -ldl ");
3145 #ifdef PROTO_ENABLED
3146 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3149 fprintf(outfl, " -lbsa_stream ");
3151 #ifdef KAFKA_ENABLED
3152 fprintf(outfl, " -lrdkafka ");
3154 fprintf(outfl," -lgscpaux");
3156 fprintf(outfl," -fprofile-arcs");
3161 "lfta.o: %s_lfta.c\n"
3162 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3164 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3165 for(i=0;i<nfiles;++i)
3166 fprintf(outfl," %s",input_file_names[i].c_str());
3168 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3170 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3172 for(i=0;i<nfiles;++i)
3173 fprintf(outfl," %s",input_file_names[i].c_str());
3174 fprintf(outfl,"\n");
3176 for(i=0;i<hfta_names.size();++i)
3179 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3182 "\t$(CPP) -o %s.o -c %s.cc\n"
3185 hfta_names[i].c_str(), hfta_names[i].c_str(),
3186 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3187 hfta_names[i].c_str(), hfta_names[i].c_str(),
3188 hfta_names[i].c_str(), hfta_names[i].c_str()
3193 "packet_schema.txt:\n"
3194 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3196 "external_fcns.def:\n"
3197 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3200 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3201 for(i=0;i<hfta_names.size();++i)
3202 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3203 fprintf(outfl,"\n");
3209 // Gather the set of interfaces
3210 // TODO : must update to hanndle machines
3211 // TODO : lookup interface attributes and add them as a parameter to rts process
3212 outfl = fopen("runit", "w");
3214 fprintf(stderr,"Can't open runit for write, exiting.\n");
3222 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3224 "if [ ! -f gshub.log ]\n"
3226 "\techo \"Failed to start bin/gshub.py\"\n"
3229 "ADDR=`cat gshub.log`\n"
3230 "ps opgid= $! >> gs.pids\n"
3231 "./rts $ADDR default ").c_str(), outfl);
3234 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3235 string ifnm = (*ssi);
3236 // suppress internal _local_ interface
3237 if (ifnm == "_local_")
3239 fprintf(outfl, "%s ",ifnm.c_str());
3240 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3241 for(j=0;j<ifv.size();++j)
3242 fprintf(outfl, "%s ",ifv[j].c_str());
3244 fprintf(outfl, " &\n");
3245 fprintf(outfl, "echo $! >> gs.pids\n");
3246 for(i=0;i<hfta_names.size();++i)
3247 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3249 for(j=0;j<opviews.opview_list.size();++j){
3250 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3254 system("chmod +x runit");
3256 outfl = fopen("stopit", "w");
3258 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3262 fprintf(outfl,"#!/bin/sh\n"
3264 "if [ ! -f gs.pids ]\n"
3268 "for pgid in `cat gs.pids`\n"
3270 "kill -TERM -$pgid\n"
3273 "for pgid in `cat gs.pids`\n"
3280 system("chmod +x stopit");
3282 //-----------------------------------------------
3284 /* For now disable support for virtual interfaces
3285 outfl = fopen("set_vinterface_hash.bat", "w");
3287 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3291 // The format should be determined by an entry in the ifres.xml file,
3292 // but for now hardcode the only example I have.
3293 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3294 if(rts_hload.count((*ssi))){
3295 string iface_name = (*ssi);
3296 string iface_number = "";
3297 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3298 if(isdigit(iface_name[j])){
3299 iface_number = iface_name[j];
3300 if(j>0 && isdigit(iface_name[j-1]))
3301 iface_number = iface_name[j-1] + iface_number;
3305 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3306 vector<int> halloc = rts_hload[iface_name];
3308 for(j=0;j<halloc.size();++j){
3311 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3312 prev_limit = halloc[j];
3314 fprintf(outfl,"\n");
3318 system("chmod +x set_vinterface_hash.bat");
3322 // Code for implementing a local schema
3324 table_list qpSchema;
3326 // Load the schemas of any LFTAs.
3328 for(l=0;l<hfta_nbr;++l){
3329 stream_query *sq0 = split_queries[l];
3330 table_def *td = sq0->get_output_tabledef();
3331 qpSchema.append_table(td);
3333 // load the schemas of any other ref'd tables.
3335 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3337 for(ti=0;ti<input_tbl_names.size();++ti){
3338 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3340 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3342 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3345 qpSchema.append_table(Schema->get_table(tbl_ref));
3350 // Functions related to parsing.
3353 static int split_string(char *instr,char sep, char **words,int max_words){
3359 words[nwords++] = str;
3360 while( (loc = strchr(str,sep)) != NULL){
3363 if(nwords >= max_words){
3364 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3365 nwords = max_words-1;
3367 words[nwords++] = str;