1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
69 // Interface to the field list verifier
70 field_list *field_verifier = NULL;
72 #define TMPSTRLEN 1000
75 #define PATH_DELIM '/'
78 char tmp_schema_str[10000];
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
85 // Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
88 // Interface to FTA definition lexer and parser ...
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
99 // Interface to external function lexer and parser ...
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
105 ext_fcn_list *Ext_fcns;
108 // Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
118 // forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120 vector<string> &hfta_names, opview_set &opviews,
121 vector<string> &machine_names,
122 string schema_file_name,
123 vector<string> &interface_names,
124 ifq_t *ifdb, string &config_dir_path,
127 map<string, vector<int> > &rts_hload
130 //static int split_string(char *instr,char sep, char **words,int max_words);
133 FILE *schema_summary_output = NULL; // query names
135 // Dump schema summary
136 void dump_summary(stream_query *str){
137 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
139 table_def *sch = str->get_output_tabledef();
141 vector<field_entry *> flds = sch->get_fields();
143 for(f=0;f<flds.size();++f){
144 if(f>0) fprintf(schema_summary_output,"|");
145 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
147 fprintf(schema_summary_output,"\n");
148 for(f=0;f<flds.size();++f){
149 if(f>0) fprintf(schema_summary_output,"|");
150 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
152 fprintf(schema_summary_output,"\n");
156 string hostname; // name of current host.
158 bool generate_stats = false;
159 string root_path = "../..";
162 int main(int argc, char **argv){
163 char tmpstr[TMPSTRLEN];
167 set<int>::iterator si;
169 vector<string> registration_query_names; // for lfta.c registration
170 map<string, vector<int> > mach_query_names; // list queries of machine
171 vector<int> snap_lengths; // for lfta.c registration
172 vector<string> interface_names; // for lfta.c registration
173 vector<string> machine_names; // machine of interface
174 vector<bool> lfta_reuse_options; // for lfta.c registration
175 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
176 vector<string> hfta_names; // hfta cource code names, for
177 // creating make file.
178 vector<string> qnames; // ensure unique names
179 map<string, int> lfta_names; // keep track of unique lftas.
182 // set these to 1 to debug the parser
184 Ext_fcnsParserdebug = 0;
186 FILE *lfta_out; // lfta.c output.
187 FILE *fta_in; // input file
188 FILE *table_schemas_in; // source tables definition file
189 FILE *query_name_output; // query names
190 FILE *qtree_output; // interconnections of query nodes
192 // -------------------------------
193 // Handling of Input Arguments
194 // -------------------------------
195 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
196 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
197 "\t[-B] : debug only (don't create output files)\n"
198 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
199 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
200 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
201 "\t[-C] : use <config directory> for definition files\n"
202 "\t[-l] : use <library directory> for library queries\n"
203 "\t[-N] : output query names in query_names.txt\n"
204 "\t[-H] : create HFTA only (no schema_file)\n"
205 "\t[-Q] : use query name for hfta suffix\n"
206 "\t[-M] : generate make file and runit, stopit scripts\n"
207 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
208 "\t[-f] : Output schema summary to schema_summary.txt\n"
209 "\t[-P] : link with PADS\n"
210 "\t[-h] : override host name.\n"
211 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
212 "\t[-R] : path to root of GS-lite\n"
215 // parameters gathered from command line processing
216 string external_fcns_path;
217 // string internal_fcn_path;
218 string config_dir_path;
219 string library_path = "./";
220 vector<string> input_file_names;
221 string schema_file_name;
222 bool debug_only = false;
223 bool hfta_only = false;
224 bool output_query_names = false;
225 bool output_schema_summary=false;
226 bool numeric_hfta_flname = true;
227 bool create_makefile = false;
228 bool distributed_mode = false;
229 bool partitioned_mode = false;
230 bool use_live_hosts_file = false;
231 bool use_pads = false;
232 bool clean_make = false;
233 int n_virtual_interfaces = 1;
236 while((chopt = getopt(argc,argv,optstr)) != -1){
242 distributed_mode = true;
245 partitioned_mode = true;
248 use_live_hosts_file = true;
252 config_dir_path = string(optarg) + string("/");
256 library_path = string(optarg) + string("/");
259 output_query_names = true;
262 numeric_hfta_flname = false;
265 if(schema_file_name == ""){
270 output_schema_summary=true;
273 create_makefile=true;
294 n_virtual_interfaces = atoi(optarg);
295 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
296 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
297 n_virtual_interfaces = 1;
302 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
303 fprintf(stderr,"%s\n", usage_str);
306 fprintf(stderr, "Argument was %c\n", optopt);
307 fprintf(stderr,"Invalid arguments\n");
308 fprintf(stderr,"%s\n", usage_str);
314 for (int i = 0; i < argc; ++i) {
315 if((schema_file_name == "") && !hfta_only){
316 schema_file_name = argv[i];
318 input_file_names.push_back(argv[i]);
322 if(input_file_names.size() == 0){
323 fprintf(stderr,"%s\n", usage_str);
328 string clean_cmd = "rm Makefile hfta_*.cc";
329 int clean_ret = system(clean_cmd.c_str());
331 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
336 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
338 // Open globally used file names.
340 // prepend config directory to schema file
341 schema_file_name = config_dir_path + schema_file_name;
342 external_fcns_path = config_dir_path + string("external_fcns.def");
343 string ifx_fname = config_dir_path + string("ifres.xml");
345 // Find interface query file(s).
347 gethostname(tmpstr,TMPSTRLEN);
350 hostname_len = strlen(tmpstr);
351 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
352 vector<string> ifq_fls;
354 ifq_fls.push_back(ifq_fname);
357 // Get the field list, if it exists
358 string flist_fl = config_dir_path + "field_list.xml";
360 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
361 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
362 xml_leaves = new xml_t();
363 xmlParser_setfileinput(flf_in);
364 if(xmlParserparse()){
365 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
367 field_verifier = new field_list(xml_leaves);
372 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
373 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
379 if(!(debug_only || hfta_only)){
380 if((lfta_out = fopen("lfta.c","w")) == NULL){
381 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
387 // Get the output specification file.
389 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
390 string ospec_fl = "output_spec.cfg";
392 vector<ospec_str *> output_specs;
393 multimap<string, int> qname_to_ospec;
394 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
397 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
399 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
400 if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
402 // make operator type lowercase
404 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
405 *tmpc = tolower(*tmpc);
407 ospec_str *tmp_ospec = new ospec_str();
408 tmp_ospec->query = flds[0];
409 tmp_ospec->operator_type = flds[1];
410 tmp_ospec->operator_param = flds[2];
411 tmp_ospec->output_directory = flds[3];
412 tmp_ospec->bucketwidth = atoi(flds[4]);
413 tmp_ospec->partitioning_flds = flds[5];
414 tmp_ospec->n_partitions = atoi(flds[6]);
415 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
416 output_specs.push_back(tmp_ospec);
418 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
424 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
429 string pspec_fl = "hfta_parallelism.cfg";
431 map<string, int> hfta_parallelism;
432 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
435 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
436 bool good_entry = true;
438 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
440 string hname = flds[0];
441 int par = atoi(flds[1]);
442 if(par <= 0 || par > n_virtual_interfaces){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
446 if(good_entry && n_virtual_interfaces % par != 0){
447 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
451 hfta_parallelism[hname] = par;
455 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
459 // LFTA hash table sizes
460 string htspec_fl = "lfta_htsize.cfg";
461 FILE *htsp_in = NULL;
462 map<string, int> lfta_htsize;
463 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
466 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
467 bool good_entry = true;
469 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
471 string lfta_name = flds[0];
472 int htsz = atoi(flds[1]);
474 lfta_htsize[lfta_name] = htsz;
476 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
481 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
484 // LFTA vitual interface hash split
485 string rtlspec_fl = "rts_load.cfg";
487 map<string, vector<int> > rts_hload;
488 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
493 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
494 bool good_entry = true;
498 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
500 iface_name = flds[0];
503 for(j=1;j<nflds;++j){
504 int h = atoi(flds[j]);
508 hload.push_back(cumm_h);
514 rts_hload[iface_name] = hload;
516 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
523 if(output_query_names){
524 if((query_name_output = fopen("query_names.txt","w")) == NULL){
525 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
530 if(output_schema_summary){
531 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
532 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
537 if((qtree_output = fopen("qtree.xml","w")) == NULL){
538 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
541 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
542 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
543 fprintf(qtree_output,"<QueryNodes>\n");
546 // Get an initial Schema
549 // Parse the table schema definitions.
550 fta_parse_result = new fta_parse_t();
551 FtaParser_setfileinput(table_schemas_in);
552 if(FtaParserparse()){
553 fprintf(stderr,"Table schema parse failed.\n");
556 if(fta_parse_result->parse_type != TABLE_PARSE){
557 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
560 Schema = fta_parse_result->tables;
562 // Ensure that all schema_ids, if set, are distinct.
563 // Obsolete? There is code elsewhere to ensure that schema IDs are
564 // distinct on a per-interface basis.
568 for(int t=0;t<Schema->size();++t){
569 int sch_id = Schema->get_table(t)->get_schema_id();
571 if(found_ids.find(sch_id) != found_ids.end()){
572 dup_ids.insert(sch_id);
574 found_ids.insert(sch_id);
577 if(dup_ids.size()>0){
578 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
579 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
580 fprintf(stderr," %d",(*dit));
581 fprintf(stderr,"\n");
588 // Process schema field inheritance
590 retval = Schema->unroll_tables(err_str);
592 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
596 // hfta only => we will try to fetch schemas from the registry.
597 // therefore, start off with an empty schema.
598 Schema = new table_list();
602 // Open and parse the external functions file.
603 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
604 if(Ext_fcnsParserin == NULL){
605 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
606 Ext_fcns = new ext_fcn_list();
608 if(Ext_fcnsParserparse()){
609 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
610 Ext_fcns = new ext_fcn_list();
613 if(Ext_fcns->validate_fcns(err_str)){
614 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
618 // Open and parse the interface resources file.
619 // ifq_t *ifaces_db = new ifq_t();
621 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
622 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
623 // ifx_fname.c_str(), ierr.c_str());
626 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
627 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
628 // ifq_fname.c_str(), ierr.c_str());
633 // The LFTA code string.
634 // Put the standard preamble here.
635 // NOTE: the hash macros, fcns should go into the run time
636 map<string, string> lfta_val;
637 map<string, string> lfta_prefilter_val;
640 "#include <limits.h>\n"
641 "#include \"rts.h\"\n"
642 "#include \"fta.h\"\n"
643 "#include \"lapp.h\"\n"
644 "#include \"rts_udaf.h\"\n"
645 "#include<stdio.h>\n"
646 "#include <float.h>\n"
647 "#include \"rdtsc.h\"\n"
648 "#include \"watchlist.h\"\n\n"
651 // Get any locally defined parsing headers
653 memset(&glob_result, 0, sizeof(glob_result));
655 // do the glob operation TODO should be from GSROOT
656 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
657 if(return_value == 0){
659 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
661 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
662 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
666 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
670 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
671 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
672 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
673 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
678 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
680 "#define SLOT_FILLED 0x04\n"
681 "#define SLOT_GEN_BITS 0x03\n"
682 "#define SLOT_HASH_BITS 0xfffffff8\n"
683 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
684 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
685 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
688 "#define lfta_BOOL_to_hash(x) (x)\n"
689 "#define lfta_USHORT_to_hash(x) (x)\n"
690 "#define lfta_UINT_to_hash(x) (x)\n"
691 "#define lfta_IP_to_hash(x) (x)\n"
692 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
693 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
694 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
695 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
696 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
697 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
698 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
699 " for(i=0;i<x.length;++i){\n"
700 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
706 " if((i%4)!=0) ret ^=tmp_sum;\n"
712 //////////////////////////////////////////////////////////////////
713 ///// Get all of the query parse trees
717 int hfta_count = 0; // for numeric suffixes to hfta .cc files
719 //---------------------------
720 // Global info needed for post processing.
722 // Set of operator views ref'd in the query set.
724 // lftas on a per-machine basis.
725 map<string, vector<stream_query *> > lfta_mach_lists;
726 int nfiles = input_file_names.size();
727 vector<stream_query *> hfta_list; // list of hftas.
728 map<string, stream_query *> sq_map; // map from query name to stream query.
731 //////////////////////////////////////////
733 // Open and parse the interface resources file.
734 ifq_t *ifaces_db = new ifq_t();
736 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
737 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
738 ifx_fname.c_str(), ierr.c_str());
741 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
742 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
743 ifq_fls[0].c_str(), ierr.c_str());
747 map<string, string> qname_to_flname; // for detecting duplicate query names
751 // Parse the files to create a vector of parse trees.
752 // Load qnodes with information to perform a topo sort
753 // based on query dependencies.
754 vector<query_node *> qnodes; // for topo sort.
755 map<string,int> name_node_map; // map query name to qnodes entry
756 for(i=0;i<input_file_names.size();i++){
758 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
759 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
762 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
764 // Parse the FTA query
765 fta_parse_result = new fta_parse_t();
766 FtaParser_setfileinput(fta_in);
767 if(FtaParserparse()){
768 fprintf(stderr,"FTA parse failed.\n");
771 if(fta_parse_result->parse_type != QUERY_PARSE){
772 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
776 // returns a list of parse trees
777 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
778 for(p=0;p<qlist.size();++p){
779 table_exp_t *fta_parse_tree = qlist[p];
780 // query_parse_trees.push_back(fta_parse_tree);
782 // compute the default name -- extract from query name
783 strcpy(tmpstr,input_file_names[i].c_str());
784 char *qname = strrchr(tmpstr,PATH_DELIM);
789 char *qname_end = strchr(qname,'.');
790 if(qname_end != NULL) *qname_end = '\0';
791 string qname_str = qname;
792 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
794 // Deternmine visibility. Should I be attaching all of the output methods?
795 if(qname_to_ospec.count(imputed_qname)>0)
796 fta_parse_tree->set_visible(true);
798 fta_parse_tree->set_visible(false);
801 // Create a manipulable repesentation of the parse tree.
802 // the qnode inherits the visibility assigned to the parse tree.
803 int pos = qnodes.size();
804 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
805 name_node_map[ qnodes[pos]->name ] = pos;
806 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
807 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
808 // qfiles.push_back(i);
810 // Check for duplicate query names
811 // NOTE : in hfta-only generation, I should
812 // also check with the names of the registered queries.
813 if(qname_to_flname.count(qnodes[pos]->name) > 0){
814 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
815 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
818 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
819 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
820 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
823 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
829 // Add the library queries
832 for(pos=0;pos<qnodes.size();++pos){
834 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
835 string src_tbl = qnodes[pos]->refd_tbls[fi];
836 if(qname_to_flname.count(src_tbl) == 0){
837 int last_sep = src_tbl.find_last_of('/');
838 if(last_sep != string::npos){
839 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
840 string target_qname = src_tbl.substr(last_sep+1);
841 string qpathname = library_path + src_tbl + ".gsql";
842 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
843 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
845 fprintf(stderr,"After exit\n");
847 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
848 // Parse the FTA query
849 fta_parse_result = new fta_parse_t();
850 FtaParser_setfileinput(fta_in);
851 if(FtaParserparse()){
852 fprintf(stderr,"FTA parse failed.\n");
855 if(fta_parse_result->parse_type != QUERY_PARSE){
856 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
860 map<string, int> local_query_map;
861 vector<string> local_query_names;
862 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
863 for(p=0;p<qlist.size();++p){
864 table_exp_t *fta_parse_tree = qlist[p];
865 fta_parse_tree->set_visible(false); // assumed to not produce output
866 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
867 if(imputed_qname == target_qname)
868 imputed_qname = src_tbl;
869 if(local_query_map.count(imputed_qname)>0){
870 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
873 local_query_map[ imputed_qname ] = p;
874 local_query_names.push_back(imputed_qname);
877 if(local_query_map.count(src_tbl)==0){
878 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
882 vector<int> worklist;
883 set<int> added_queries;
884 vector<query_node *> new_qnodes;
885 worklist.push_back(local_query_map[target_qname]);
886 added_queries.insert(local_query_map[target_qname]);
888 int qpos = qnodes.size();
889 for(qq=0;qq<worklist.size();++qq){
890 int q_id = worklist[qq];
891 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
892 new_qnodes.push_back( new_qnode);
893 vector<string> refd_tbls = new_qnode->refd_tbls;
895 for(ff = 0;ff<refd_tbls.size();++ff){
896 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
898 if(name_node_map.count(refd_tbls[ff])>0){
899 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
902 worklist.push_back(local_query_map[refd_tbls[ff]]);
908 for(qq=0;qq<new_qnodes.size();++qq){
909 int qpos = qnodes.size();
910 qnodes.push_back(new_qnodes[qq]);
911 name_node_map[qnodes[qpos]->name ] = qpos;
912 qname_to_flname[qnodes[qpos]->name ] = qpathname;
926 //---------------------------------------
931 string udop_missing_sources;
932 for(i=0;i<qnodes.size();++i){
934 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
935 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
937 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
938 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
939 int pos = qnodes.size();
940 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
941 name_node_map[ qnodes[pos]->name ] = pos;
942 qnodes[pos]->is_externally_visible = false; // its visible
943 // Need to mark the source queries as visible.
945 string missing_sources = "";
946 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
947 string src_tbl = qnodes[pos]->refd_tbls[si];
948 if(name_node_map.count(src_tbl)==0){
949 missing_sources += src_tbl + " ";
952 if(missing_sources != ""){
953 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
960 if(udop_missing_sources != ""){
961 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
967 ////////////////////////////////////////////////////////////////////
968 /// Check parse trees to verify that some
969 /// global properties are met :
970 /// if q1 reads from q2, then
971 /// q2 is processed before q1
972 /// q1 can supply q2's parameters
973 /// Verify there is no cycle in the reads-from graph.
975 // Compute an order in which to process the
978 // Start by building the reads-from lists.
981 for(i=0;i<qnodes.size();++i){
983 vector<string> refd_tbls = qnodes[i]->refd_tbls;
984 for(fi = 0;fi<refd_tbls.size();++fi){
985 if(name_node_map.count(refd_tbls[fi])>0){
986 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
987 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
993 // If one query reads the result of another,
994 // check for parameter compatibility. Currently it must
995 // be an exact match. I will move to requiring
996 // containment after re-ordering, but will require
997 // some analysis for code generation which is not
999 //printf("There are %d query nodes.\n",qnodes.size());
1002 for(i=0;i<qnodes.size();++i){
1003 vector<var_pair_t *> target_params = qnodes[i]->params;
1004 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1005 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1006 if(target_params.size() != source_params.size()){
1007 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1011 for(p=0;p<target_params.size();++p){
1012 if(! (target_params[p]->name == source_params[p]->name &&
1013 target_params[p]->val == source_params[p]->val ) ){
1014 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1023 // Start by counting inedges.
1024 for(i=0;i<qnodes.size();++i){
1025 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1026 qnodes[(*si)]->n_consumers++;
1030 // The roots are the nodes with indegree zero.
1032 for(i=0;i<qnodes.size();++i){
1033 if(qnodes[i]->n_consumers == 0){
1034 if(qnodes[i]->is_externally_visible == false){
1035 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1041 // Remove the parts of the subtree that produce no output.
1042 set<int> valid_roots;
1043 set<int> discarded_nodes;
1044 set<int> candidates;
1045 while(roots.size() >0){
1046 for(si=roots.begin();si!=roots.end();++si){
1047 if(qnodes[(*si)]->is_externally_visible){
1048 valid_roots.insert((*si));
1050 discarded_nodes.insert((*si));
1051 set<int>::iterator sir;
1052 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1053 qnodes[(*sir)]->n_consumers--;
1054 if(qnodes[(*sir)]->n_consumers == 0)
1055 candidates.insert( (*sir));
1062 roots = valid_roots;
1063 if(discarded_nodes.size()>0){
1064 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1066 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1067 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1069 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1071 fprintf(stderr,"\n");
1074 // Compute the sources_to set, ignoring discarded nodes.
1075 for(i=0;i<qnodes.size();++i){
1076 if(discarded_nodes.count(i)==0)
1077 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1078 qnodes[(*si)]->sources_to.insert(i);
1083 // Find the nodes that are shared by multiple visible subtrees.
1084 // THe roots become inferred visible nodes.
1086 // Find the visible nodes.
1087 vector<int> visible_nodes;
1088 for(i=0;i<qnodes.size();i++){
1089 if(qnodes[i]->is_externally_visible){
1090 visible_nodes.push_back(i);
1094 // Find UDOPs referenced by visible nodes.
1096 for(i=0;i<visible_nodes.size();++i){
1097 workq.push_back(visible_nodes[i]);
1099 while(!workq.empty()){
1100 int node = workq.front();
1102 set<int>::iterator children;
1103 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1104 qnodes[node]->is_externally_visible = true;
1105 visible_nodes.push_back(node);
1106 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1107 if(qnodes[(*children)]->is_externally_visible == false){
1108 qnodes[(*children)]->is_externally_visible = true;
1109 visible_nodes.push_back((*children));
1113 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1114 workq.push_back((*children));
1121 for(i=0;i<qnodes.size();i++){
1122 qnodes[i]->subtree_roots.clear();
1125 // Walk the tree defined by a visible node, not descending into
1126 // subtrees rooted by a visible node. Mark the node visited with
1127 // the visible node ID.
1128 for(i=0;i<visible_nodes.size();++i){
1130 vroots.insert(visible_nodes[i]);
1131 while(vroots.size()>0){
1132 for(si=vroots.begin();si!=vroots.end();++si){
1133 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1135 set<int>::iterator sir;
1136 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1137 if(! qnodes[(*sir)]->is_externally_visible){
1138 candidates.insert( (*sir));
1142 vroots = candidates;
1146 // Find the nodes in multiple visible node subtrees, but with no parent
1147 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1148 done = true; // until proven otherwise
1149 for(i=0;i<qnodes.size();i++){
1150 if(qnodes[i]->subtree_roots.size()>1){
1151 bool is_new_root = true;
1152 set<int>::iterator sir;
1153 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1154 if(qnodes[(*sir)]->subtree_roots.size()>1)
1155 is_new_root = false;
1158 qnodes[i]->is_externally_visible = true;
1159 qnodes[i]->inferred_visible_node = true;
1160 visible_nodes.push_back(i);
1171 // get visible nodes in topo ordering.
1172 // for(i=0;i<qnodes.size();i++){
1173 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1175 vector<int> process_order;
1176 while(roots.size() >0){
1177 for(si=roots.begin();si!=roots.end();++si){
1178 if(discarded_nodes.count((*si))==0){
1179 process_order.push_back( (*si) );
1181 set<int>::iterator sir;
1182 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1183 qnodes[(*sir)]->n_consumers--;
1184 if(qnodes[(*sir)]->n_consumers == 0)
1185 candidates.insert( (*sir));
1193 //printf("process_order.size() =%d\n",process_order.size());
1195 // Search for cyclic dependencies
1197 for(i=0;i<qnodes.size();++i){
1198 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1199 if(found_dep.size() != 0) found_dep += ", ";
1200 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1203 if(found_dep.size()>0){
1204 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1208 // Get a list of query sets, in the order to be processed.
1209 // Start at visible root and do bfs.
1210 // The query set includes queries referenced indirectly,
1211 // as sources for user-defined operators. These are needed
1212 // to ensure that they are added to the schema, but are not part
1213 // of the query tree.
1215 // stream_node_sets contains queries reachable only through the
1216 // FROM clause, so I can tell which queries to add to the stream
1217 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1219 // NOTE: this code works because in order for data to be
1220 // read by multiple hftas, the node must be externally visible.
1221 // But visible nodes define roots of process sets.
1222 // internally visible nodes can feed data only
1223 // to other nodes in the same query file.
1224 // Therefore, any access can be restricted to a file,
1225 // hfta output sharing is done only on roots
1226 // never on interior nodes.
1231 // Conpute the base collection of hftas.
1232 vector<hfta_node *> hfta_sets;
1233 map<string, int> hfta_name_map;
1234 // vector< vector<int> > process_sets;
1235 // vector< set<int> > stream_node_sets;
1236 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1237 // i.e. process leaves 1st.
1238 for(i=0;i<process_order.size();++i){
1239 if(qnodes[process_order[i]]->is_externally_visible == true){
1240 //printf("Visible.\n");
1241 int root = process_order[i];
1242 hfta_node *hnode = new hfta_node();
1243 hnode->name = qnodes[root]-> name;
1244 hnode->source_name = qnodes[root]-> name;
1245 hnode->is_udop = qnodes[root]->is_udop;
1246 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1248 vector<int> proc_list; proc_list.push_back(root);
1249 // Ensure that nodes are added only once.
1250 set<int> proc_set; proc_set.insert(root);
1251 roots.clear(); roots.insert(root);
1253 while(roots.size()>0){
1254 for(si=roots.begin();si!=roots.end();++si){
1255 //printf("Processing root %d\n",(*si));
1256 set<int>::iterator sir;
1257 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1258 //printf("reads fom %d\n",(*sir));
1259 if(qnodes[(*sir)]->is_externally_visible==false){
1260 candidates.insert( (*sir) );
1261 if(proc_set.count( (*sir) )==0){
1262 proc_set.insert( (*sir) );
1263 proc_list.push_back( (*sir) );
1272 reverse(proc_list.begin(), proc_list.end());
1273 hnode->query_node_indices = proc_list;
1274 hfta_name_map[hnode->name] = hfta_sets.size();
1275 hfta_sets.push_back(hnode);
1279 // Compute the reads_from / sources_to graphs for the hftas.
1281 for(i=0;i<hfta_sets.size();++i){
1282 hfta_node *hnode = hfta_sets[i];
1283 for(q=0;q<hnode->query_node_indices.size();q++){
1284 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1285 for(s=0;s<qnode->refd_tbls.size();++s){
1286 if(hfta_name_map.count(qnode->refd_tbls[s])){
1287 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1288 hnode->reads_from.insert(other_hfta);
1289 hfta_sets[other_hfta]->sources_to.insert(i);
1295 // Compute a topological sort of the hfta_sets.
1297 vector<int> hfta_topsort;
1299 int hnode_srcs[hfta_sets.size()];
1300 for(i=0;i<hfta_sets.size();++i){
1302 if(hfta_sets[i]->sources_to.size() == 0)
1306 while(! workq.empty()){
1307 int node = workq.front();
1309 hfta_topsort.push_back(node);
1310 set<int>::iterator stsi;
1311 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1312 int parent = (*stsi);
1313 hnode_srcs[parent]++;
1314 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1315 workq.push_back(parent);
1320 // Decorate hfta nodes with the level of parallelism given as input.
1322 map<string, int>::iterator msii;
1323 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1324 string hfta_name = (*msii).first;
1325 int par = (*msii).second;
1326 if(hfta_name_map.count(hfta_name) > 0){
1327 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1329 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1333 // Propagate levels of parallelism: children should have a level of parallelism
1334 // as large as any of its parents. Adjust children upwards to compensate.
1335 // Start at parents and adjust children, auto-propagation will occur.
1337 for(i=hfta_sets.size()-1;i>=0;i--){
1338 set<int>::iterator stsi;
1339 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1340 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1341 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1346 // Before all the name mangling, check if therey are any output_spec.cfg
1347 // or hfta_parallelism.cfg entries that do not have a matching query.
1349 string dangling_ospecs = "";
1350 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1351 string oq = (*msii).first;
1352 if(hfta_name_map.count(oq) == 0){
1353 dangling_ospecs += " "+(*msii).first;
1356 if(dangling_ospecs!=""){
1357 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1361 string dangling_par = "";
1362 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1363 string oq = (*msii).first;
1364 if(hfta_name_map.count(oq) == 0){
1365 dangling_par += " "+(*msii).first;
1368 if(dangling_par!=""){
1369 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1374 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1375 // FROM clauses: retarget any name which is an internal node, and
1376 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1377 // when the source hfta has more parallelism than the target node.
1378 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1381 int n_original_hfta_sets = hfta_sets.size();
1382 for(i=0;i<n_original_hfta_sets;++i){
1383 if(hfta_sets[i]->n_parallel > 1){
1384 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1385 set<string> local_nodes; // names of query nodes in the hfta.
1386 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1387 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1390 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1391 string mangler = "__copy"+int_to_string(p);
1392 hfta_node *par_hfta = new hfta_node();
1393 par_hfta->name = hfta_sets[i]->name + mangler;
1394 par_hfta->source_name = hfta_sets[i]->name;
1395 par_hfta->is_udop = hfta_sets[i]->is_udop;
1396 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1397 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1398 par_hfta->parallel_idx = p;
1400 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1403 if(hfta_sets[i]->is_udop){
1404 int root = hfta_sets[i]->query_node_indices[0];
1406 string unequal_par_sources;
1407 set<int>::iterator rfsii;
1408 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1409 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1410 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1413 if(unequal_par_sources != ""){
1414 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1419 vector<string> new_sources;
1420 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1421 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1424 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1425 new_qn->name += mangler;
1426 new_qn->mangler = mangler;
1427 new_qn->refd_tbls = new_sources;
1428 par_hfta->query_node_indices.push_back(qnodes.size());
1429 par_qnode_map[new_qn->name] = qnodes.size();
1430 name_node_map[ new_qn->name ] = qnodes.size();
1431 qnodes.push_back(new_qn);
1433 // regular query node
1434 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1435 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1436 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1437 // rehome the from clause on mangled names.
1438 // create merge nodes as needed for external sources.
1439 for(f=0;f<dup_pt->fm->tlist.size();++f){
1440 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1441 dup_pt->fm->tlist[f]->schema_name += mangler;
1442 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1443 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1444 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1445 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1446 dup_pt->fm->tlist[f]->schema_name += mangler;
1448 vector<string> src_tbls;
1449 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1451 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1454 for(s=0;s<stride;++s){
1455 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1456 src_tbls.push_back(ext_src_name);
1458 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1459 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1460 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1461 // Make a qnode to represent the new merge node
1462 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1463 qn_pt->refd_tbls = src_tbls;
1464 qn_pt->is_udop = false;
1465 qn_pt->is_externally_visible = false;
1466 qn_pt->inferred_visible_node = false;
1467 par_hfta->query_node_indices.push_back(qnodes.size());
1468 par_qnode_map[merge_node_name] = qnodes.size();
1469 name_node_map[ merge_node_name ] = qnodes.size();
1470 qnodes.push_back(qn_pt);
1474 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1475 for(f=0;f<dup_pt->fm->tlist.size();++f){
1476 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1478 new_qn->params = qnodes[hqn_idx]->params;
1479 new_qn->is_udop = false;
1480 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1481 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1482 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1483 par_qnode_map[new_qn->name] = qnodes.size();
1484 name_node_map[ new_qn->name ] = qnodes.size();
1485 qnodes.push_back(new_qn);
1488 hfta_name_map[par_hfta->name] = hfta_sets.size();
1489 hfta_sets.push_back(par_hfta);
1492 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1494 if(!hfta_sets[i]->is_udop){
1495 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1496 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1497 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1498 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1499 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1500 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1501 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1502 vector<string> src_tbls;
1503 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1504 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1505 src_tbls.push_back(ext_src_name);
1507 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1508 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1509 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1510 // Make a qnode to represent the new merge node
1511 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1512 qn_pt->refd_tbls = src_tbls;
1513 qn_pt->is_udop = false;
1514 qn_pt->is_externally_visible = false;
1515 qn_pt->inferred_visible_node = false;
1516 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1517 name_node_map[ merge_node_name ] = qnodes.size();
1518 qnodes.push_back(qn_pt);
1527 // Rebuild the reads_from / sources_to lists in the qnodes
1528 for(q=0;q<qnodes.size();++q){
1529 qnodes[q]->reads_from.clear();
1530 qnodes[q]->sources_to.clear();
1532 for(q=0;q<qnodes.size();++q){
1533 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1534 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1535 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1536 qnodes[q]->reads_from.insert(rf);
1537 qnodes[rf]->sources_to.insert(q);
1542 // Rebuild the reads_from / sources_to lists in hfta_sets
1543 for(q=0;q<hfta_sets.size();++q){
1544 hfta_sets[q]->reads_from.clear();
1545 hfta_sets[q]->sources_to.clear();
1547 for(q=0;q<hfta_sets.size();++q){
1548 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1549 int node = hfta_sets[q]->query_node_indices[s];
1550 set<int>::iterator rfsii;
1551 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1552 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1553 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1554 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1561 for(q=0;q<qnodes.size();++q){
1562 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1563 set<int>::iterator rsii;
1564 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1565 printf(" %d",(*rsii));
1566 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1567 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1568 printf(" %d",(*rsii));
1572 for(q=0;q<hfta_sets.size();++q){
1573 if(hfta_sets[q]->do_generation==false)
1575 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1576 set<int>::iterator rsii;
1577 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1578 printf(" %d",(*rsii));
1579 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1580 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1581 printf(" %d",(*rsii));
1588 // Re-topo sort the hftas
1589 hfta_topsort.clear();
1591 int hnode_srcs_2[hfta_sets.size()];
1592 for(i=0;i<hfta_sets.size();++i){
1593 hnode_srcs_2[i] = 0;
1594 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1599 while(workq.empty() == false){
1600 int node = workq.front();
1602 hfta_topsort.push_back(node);
1603 set<int>::iterator stsii;
1604 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1605 int child = (*stsii);
1606 hnode_srcs_2[child]++;
1607 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1608 workq.push_back(child);
1613 // Ensure that all of the query_node_indices in hfta_sets are topologically
1614 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1615 for(i=0;i<hfta_sets.size();++i){
1616 if(hfta_sets[i]->do_generation){
1617 map<int,int> n_accounted;
1618 vector<int> new_order;
1620 vector<int>::iterator vii;
1621 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1622 n_accounted[(*vii)]= 0;
1624 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1625 set<int>::iterator rfsii;
1626 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1627 if(n_accounted.count((*rfsii)) == 0){
1628 n_accounted[(*vii)]++;
1631 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1632 workq.push_back((*vii));
1636 while(workq.empty() == false){
1637 int node = workq.front();
1639 new_order.push_back(node);
1640 set<int>::iterator stsii;
1641 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1642 if(n_accounted.count((*stsii))){
1643 n_accounted[(*stsii)]++;
1644 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1645 workq.push_back((*stsii));
1650 hfta_sets[i]->query_node_indices = new_order;
1658 /// Global checkng is done, start the analysis and translation
1659 /// of the query parse tree in the order specified by process_order
1662 // Get a list of the LFTAs for global lfta optimization
1663 // TODO: separate building operators from spliting lftas,
1664 // that will make optimizations such as predicate pushing easier.
1665 vector<stream_query *> lfta_list;
1666 stream_query *rootq;
1669 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1671 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1673 int hfta_id = hfta_topsort[qi];
1674 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1678 // Two possibilities, either its a UDOP, or its a collection of queries.
1679 // if(qnodes[curr_list.back()]->is_udop)
1680 if(hfta_sets[hfta_id]->is_udop){
1681 int node_id = curr_list.back();
1682 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1683 opview_entry *opv = new opview_entry();
1685 // Many of the UDOP properties aren't currently used.
1686 opv->parent_qname = "no_parent";
1687 opv->root_name = qnodes[node_id]->name;
1688 opv->view_name = qnodes[node_id]->file;
1690 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1691 opv->udop_alias = tmpstr;
1692 opv->mangler = qnodes[node_id]->mangler;
1694 if(opv->mangler != ""){
1695 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1696 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1699 // This piece of code makes each hfta which referes to the same udop
1700 // reference a distinct running udop. Do this at query optimization time?
1701 // fmtbl->set_udop_alias(opv->udop_alias);
1703 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1704 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1706 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1708 for(s=0;s<subq.size();++s){
1709 // Validate that the fields match.
1710 subquery_spec *sqs = subq[s];
1711 string subq_name = sqs->name + opv->mangler;
1712 vector<field_entry *> flds = Schema->get_fields(subq_name);
1713 if(flds.size() == 0){
1714 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1717 if(flds.size() < sqs->types.size()){
1718 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1721 bool failed = false;
1722 for(f=0;f<sqs->types.size();++f){
1723 data_type dte(sqs->types[f],sqs->modifiers[f]);
1724 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1725 if(! dte.subsumes_type(&dtf) ){
1726 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1730 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1731 string pstr = dte.get_temporal_string();
1732 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1739 /// Validation done, find the subquery, make a copy of the
1740 /// parse tree, and add it to the return list.
1741 for(q=0;q<qnodes.size();++q)
1742 if(qnodes[q]->name == subq_name)
1744 if(q==qnodes.size()){
1745 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1751 // Cross-link to from entry(s) in all sourced-to tables.
1752 set<int>::iterator sii;
1753 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1754 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1755 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1757 for(ii=0;ii<tblvars.size();++ii){
1758 if(tblvars[ii]->schema_name == opv->root_name){
1759 tblvars[ii]->set_opview_idx(opviews.size());
1765 opviews.append(opv);
1768 // Analyze the parse trees in this query,
1769 // put them in rootq
1770 // vector<int> curr_list = process_sets[qi];
1773 ////////////////////////////////////////
1776 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1777 for(qj=0;qj<curr_list.size();++qj){
1779 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1781 // Select the current query parse tree
1782 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1784 // if hfta only, try to fetch any missing schemas
1785 // from the registry (using the print_schema program).
1786 // Here I use a hack to avoid analyzing the query -- all referenced
1787 // tables must be in the from clause
1788 // If there is a problem loading any table, just issue a warning,
1790 tablevar_list_t *fm = fta_parse_tree->get_from();
1791 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1792 // iterate over all referenced tables
1794 for(t=0;t<refd_tbls.size();++t){
1795 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1797 if(tbl_ref < 0){ // if this table is not in the Schema
1800 string cmd="print_schema "+refd_tbls[t];
1801 FILE *schema_in = popen(cmd.c_str(), "r");
1802 if(schema_in == NULL){
1803 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1805 string schema_instr;
1806 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1807 schema_instr += tmpstr;
1809 fta_parse_result = new fta_parse_t();
1810 strcpy(tmp_schema_str,schema_instr.c_str());
1811 FtaParser_setstringinput(tmp_schema_str);
1812 if(FtaParserparse()){
1813 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1815 if( fta_parse_result->tables != NULL){
1817 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1818 Schema->add_table(fta_parse_result->tables->get_table(tl));
1821 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1826 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1834 // Analyze the query.
1835 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1837 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1841 stream_query new_sq(qs, Schema);
1842 if(new_sq.error_code){
1843 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1847 // Add it to the Schema
1848 table_def *output_td = new_sq.get_output_tabledef();
1849 Schema->add_table(output_td);
1851 // Create a query plan from the analyzed parse tree.
1852 // If its a query referneced via FROM, add it to the stream query.
1854 rootq->add_query(new_sq);
1856 rootq = new stream_query(new_sq);
1857 // have the stream query object inherit properties form the analyzed
1858 // hfta_node object.
1859 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1860 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1866 // This stream query has all its parts
1867 // Build and optimize it.
1868 //printf("translate_fta: generating plan.\n");
1869 if(rootq->generate_plan(Schema)){
1870 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1874 // If we've found the query plan head, so now add the output operators
1875 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1876 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1877 multimap<string, int>::iterator mmsi;
1878 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1879 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1880 rootq->add_output_operator(output_specs[(*mmsi).second]);
1886 // Perform query splitting if necessary.
1888 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1891 //for(l=0;l<split_queries.size();++l){
1892 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1898 if(split_queries.size() > 0){ // should be at least one component.
1900 // Compute the number of LFTAs.
1901 int n_lfta = split_queries.size();
1902 if(hfta_returned) n_lfta--;
1903 // Check if a schemaId constraint needs to be inserted.
1905 // Process the LFTA components.
1906 for(l=0;l<n_lfta;++l){
1907 if(lfta_names.count(split_queries[l]->query_name) == 0){
1908 // Grab the lfta for global optimization.
1909 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1910 string liface = "_local_";
1911 // string lmach = "";
1912 string lmach = hostname;
1914 liface = tvec[0]->get_interface(); // iface queries have been resolved
1915 if(tvec[0]->get_machine() != ""){
1916 lmach = tvec[0]->get_machine();
1918 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1921 interface_names.push_back(liface);
1922 machine_names.push_back(lmach);
1925 vector<predicate_t *> schemaid_preds;
1926 for(int irv=0;irv<tvec.size();++irv){
1928 string schema_name = tvec[irv]->get_schema_name();
1929 string rvar_name = tvec[irv]->get_var_name();
1930 int schema_ref = tvec[irv]->get_schema_ref();
1933 // interface_names.push_back(liface);
1934 // machine_names.push_back(lmach);
1936 //printf("Machine is %s\n",lmach.c_str());
1938 // Check if a schemaId constraint needs to be inserted.
1939 if(schema_ref<0){ // can result from some kinds of splits
1940 schema_ref = Schema->get_table_ref(schema_name);
1942 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1945 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1947 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1952 if(tvec[irv]->get_interface() != "_local_"){
1953 if(iface->has_multiple_schemas()){
1954 if(schema_id<0){ // invalid schema_id
1955 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1958 vector<string> iface_schemas = iface->get_property("Schemas");
1959 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1960 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1963 // Ensure that in liface, schema_id is used for only one schema
1964 if(schema_of_schemaid.count(liface)==0){
1965 map<int, string> empty_map;
1966 schema_of_schemaid[liface] = empty_map;
1968 if(schema_of_schemaid[liface].count(schema_id)==0){
1969 schema_of_schemaid[liface][schema_id] = schema_name;
1971 if(schema_of_schemaid[liface][schema_id] != schema_name){
1972 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1976 }else{ // single-schema interface
1977 schema_id = -1; // don't generate schema_id predicate
1978 vector<string> iface_schemas = iface->get_property("Schemas");
1979 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1980 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1983 if(iface_schemas.size()>1){
1984 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1992 // If we need to check the schema_id, insert a predicate into the lfta.
1993 // TODO not just schema_id, the full all_schema_ids set.
1995 colref_t *schid_cr = new colref_t("schemaId");
1996 schid_cr->schema_ref = schema_ref;
1997 schid_cr->table_name = rvar_name;
1998 schid_cr->tablevar_ref = 0;
1999 schid_cr->default_table = false;
2000 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2001 data_type *schid_dt = new data_type("uint");
2002 schid_se->dt = schid_dt;
2004 string schid_str = int_to_string(schema_id);
2005 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2006 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2007 lit_se->dt = schid_dt;
2009 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2010 vector<cnf_elem *> clist;
2011 make_cnf_from_pr(schid_pr, clist);
2012 analyze_cnf(clist[0]);
2013 clist[0]->cost = 1; // cheap one comparison
2014 // cnf built, now insert it.
2015 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2017 // Specialized processing
2018 // filter join, get two schemaid preds
2019 string node_type = split_queries[l]->query_plan[0]->node_type();
2020 if(node_type == "filter_join"){
2021 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2023 fj->pred_t0.push_back(clist[0]);
2025 fj->pred_t1.push_back(clist[0]);
2027 schemaid_preds.push_back(schid_pr);
2029 // watchlist join, get the first schemaid pred
2030 if(node_type == "watch_join"){
2031 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2033 fj->pred_t0.push_back(clist[0]);
2034 schemaid_preds.push_back(schid_pr);
2039 // Specialized processing, currently filter join.
2040 if(schemaid_preds.size()>1){
2041 string node_type = split_queries[l]->query_plan[0]->node_type();
2042 if(node_type == "filter_join"){
2043 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2044 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2045 vector<cnf_elem *> clist;
2046 make_cnf_from_pr(filter_pr, clist);
2047 analyze_cnf(clist[0]);
2048 clist[0]->cost = 1; // cheap one comparison
2049 fj->shared_pred.push_back(clist[0]);
2059 // Set the ht size from the recommendation, if there is one in the rec file
2060 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2061 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2065 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2066 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2067 lfta_list.push_back(split_queries[l]);
2068 lfta_mach_lists[lmach].push_back(split_queries[l]);
2070 // THe following is a hack,
2071 // as I should be generating LFTA code through
2072 // the stream_query object.
2074 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2076 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2079 // Create query description to embed in lfta.c
2080 string lfta_schema_str = split_queries[l]->make_schema();
2081 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2083 // get NIC capabilities.
2085 nic_property *nicprop = NULL;
2086 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2087 if(iface_codegen_type.size()){
2088 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2090 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2095 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2098 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2100 // TODO NOTE : I'd like it to be the case that registration_query_names
2101 // are the queries to be registered for subsciption.
2102 // but there is some complex bookkeeping here.
2103 registration_query_names.push_back(split_queries[l]->query_name);
2104 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2105 // NOTE: I will assume a 1-1 correspondance between
2106 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2107 // where mach_query_names[lmach][i] contains the index into
2108 // query_names, which names the lfta, and
2109 // mach_query_names[lmach][i] is the stream_query * of the
2110 // corresponding lfta.
2111 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2115 // check if lfta is reusable
2116 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2118 bool lfta_reusable = false;
2119 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2120 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2121 lfta_reusable = true;
2123 lfta_reuse_options.push_back(lfta_reusable);
2125 // LFTA will inherit the liveness timeout specification from the containing query
2126 // it is too conservative as lfta are expected to spend less time per tuple
2129 // extract liveness timeout from query definition
2130 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2131 if (!liveness_timeout) {
2132 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2133 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2134 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2136 lfta_liveness_timeouts.push_back(liveness_timeout);
2138 // Add it to the schema
2139 table_def *td = split_queries[l]->get_output_tabledef();
2140 Schema->append_table(td);
2141 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2146 // If the output is lfta-only, dump out the query name.
2147 if(split_queries.size() == 1 && !hfta_returned){
2148 if(output_query_names ){
2149 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2153 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2158 // output schema summary
2159 if(output_schema_summary){
2160 dump_summary(split_queries[0]);
2166 if(hfta_returned){ // query also has an HFTA component
2167 int hfta_nbr = split_queries.size()-1;
2169 hfta_list.push_back(split_queries[hfta_nbr]);
2171 // report on generated query names
2172 if(output_query_names){
2173 string hfta_name =split_queries[hfta_nbr]->query_name;
2174 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2175 for(l=0;l<hfta_nbr;++l){
2176 string lfta_name =split_queries[l]->query_name;
2177 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2181 // fprintf(stderr,"query names are ");
2182 // for(l=0;l<hfta_nbr;++l){
2183 // if(l>0) fprintf(stderr,",");
2184 // string fta_name =split_queries[l]->query_name;
2185 // fprintf(stderr," %s",fta_name.c_str());
2187 // fprintf(stderr,"\n");
2192 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2193 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2200 //-----------------------------------------------------------------
2201 // Compute and propagate the SE in PROTOCOL fields compute a field.
2202 //-----------------------------------------------------------------
2204 for(i=0;i<lfta_list.size();i++){
2205 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2206 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2208 for(i=0;i<hfta_list.size();i++){
2209 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2210 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2215 //------------------------------------------------------------------------
2216 // Perform individual FTA optimizations
2217 //-----------------------------------------------------------------------
2219 if (partitioned_mode) {
2221 // open partition definition file
2222 string part_fname = config_dir_path + "partition.txt";
2224 FILE* partfd = fopen(part_fname.c_str(), "r");
2226 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2229 PartnParser_setfileinput(partfd);
2230 if (PartnParserparse()) {
2231 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2238 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2240 int num_hfta = hfta_list.size();
2241 for(i=0; i < hfta_list.size(); ++i){
2242 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2245 // Add all new hftas to schema
2246 for(i=num_hfta; i < hfta_list.size(); ++i){
2247 table_def *td = hfta_list[i]->get_output_tabledef();
2248 Schema->append_table(td);
2251 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2255 //------------------------------------------------------------------------
2256 // Do global (cross-fta) optimization
2257 //-----------------------------------------------------------------------
2264 set<string> extra_external_libs;
2266 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2269 // build hfta file name, create output
2270 if(numeric_hfta_flname){
2271 sprintf(tmpstr,"hfta_%d",hfta_count);
2272 hfta_names.push_back(tmpstr);
2273 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2275 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2276 hfta_names.push_back(tmpstr);
2277 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2279 FILE *hfta_fl = fopen(tmpstr,"w");
2280 if(hfta_fl == NULL){
2281 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2284 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2286 // If there is a field verifier, warn about
2287 // lack of compatability
2288 // NOTE : this code assumes that visible non-lfta queries
2289 // are those at the root of a stream query.
2290 string hfta_comment;
2292 string hfta_namespace;
2293 if(hfta_list[i]->defines.count("comment")>0)
2294 hfta_comment = hfta_list[i]->defines["comment"];
2295 if(hfta_list[i]->defines.count("Comment")>0)
2296 hfta_comment = hfta_list[i]->defines["Comment"];
2297 if(hfta_list[i]->defines.count("COMMENT")>0)
2298 hfta_comment = hfta_list[i]->defines["COMMENT"];
2299 if(hfta_list[i]->defines.count("title")>0)
2300 hfta_title = hfta_list[i]->defines["title"];
2301 if(hfta_list[i]->defines.count("Title")>0)
2302 hfta_title = hfta_list[i]->defines["Title"];
2303 if(hfta_list[i]->defines.count("TITLE")>0)
2304 hfta_title = hfta_list[i]->defines["TITLE"];
2305 if(hfta_list[i]->defines.count("namespace")>0)
2306 hfta_namespace = hfta_list[i]->defines["namespace"];
2307 if(hfta_list[i]->defines.count("Namespace")>0)
2308 hfta_namespace = hfta_list[i]->defines["Namespace"];
2309 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2310 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2312 if(field_verifier != NULL){
2314 if(hfta_comment == "")
2315 warning_str += "\tcomment not found.\n";
2317 // Obsolete stuff that Carsten wanted
2318 // if(hfta_title == "")
2319 // warning_str += "\ttitle not found.\n";
2320 // if(hfta_namespace == "")
2321 // warning_str += "\tnamespace not found.\n";
2323 // There is a get_tbl_keys method implemented for qp_nodes,
2324 // integrate it into steam_query, then call it to find keys,
2325 // and annotate feidls with their key-ness.
2326 // If there is a "keys" proprty in the defines block, override anything returned
2327 // from the automated analysis
2329 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2331 for(fi=0;fi<flds.size();fi++){
2332 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2334 if(warning_str != "")
2335 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2336 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2339 // Get the fields in this query
2340 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2342 // do key processing
2343 string hfta_keys_s = "";
2344 if(hfta_list[i]->defines.count("keys")>0)
2345 hfta_keys_s = hfta_list[i]->defines["keys"];
2346 if(hfta_list[i]->defines.count("Keys")>0)
2347 hfta_keys_s = hfta_list[i]->defines["Keys"];
2348 if(hfta_list[i]->defines.count("KEYS")>0)
2349 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2350 string xtra_keys_s = "";
2351 if(hfta_list[i]->defines.count("extra_keys")>0)
2352 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2353 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2354 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2355 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2356 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2358 vector<string> hfta_keys;
2359 vector<string> partial_keys;
2360 vector<string> xtra_keys;
2361 if(hfta_keys_s==""){
2362 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2363 if(xtra_keys_s.size()>0){
2364 xtra_keys = split_string(xtra_keys_s, ',');
2366 for(int xi=0;xi<xtra_keys.size();++xi){
2367 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2368 hfta_keys.push_back(xtra_keys[xi]);
2372 hfta_keys = split_string(hfta_keys_s, ',');
2374 // validate that all of the keys exist in the output.
2375 // (exit on error, as its a bad specificiation)
2376 vector<string> missing_keys;
2377 for(int ki=0;ki<hfta_keys.size(); ++ki){
2379 for(fi=0;fi<flds.size();++fi){
2380 if(hfta_keys[ki] == flds[fi]->get_name())
2384 missing_keys.push_back(hfta_keys[ki]);
2386 if(missing_keys.size()>0){
2387 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2388 for(int hi=0; hi<missing_keys.size(); ++hi){
2389 fprintf(stderr," %s", missing_keys[hi].c_str());
2391 fprintf(stderr,"\n");
2395 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2396 if(hfta_comment != "")
2397 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2398 if(hfta_title != "")
2399 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2400 if(hfta_namespace != "")
2401 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2402 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2403 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2405 // write info about fields to qtree.xml
2407 for(fi=0;fi<flds.size();fi++){
2408 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2409 if(flds[fi]->get_modifier_list()->size()){
2410 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2412 fprintf(qtree_output," />\n");
2415 for(int hi=0;hi<hfta_keys.size();++hi){
2416 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2418 for(int hi=0;hi<partial_keys.size();++hi){
2419 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2421 for(int hi=0;hi<xtra_keys.size();++hi){
2422 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2426 // extract liveness timeout from query definition
2427 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2428 if (!liveness_timeout) {
2429 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2430 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2431 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2433 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2435 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2437 for(itv=0;itv<tmp_tv.size();++itv){
2438 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2440 string ifrs = hfta_list[i]->collect_refd_ifaces();
2442 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2444 fprintf(qtree_output,"\t</HFTA>\n");
2448 // debug only -- do code generation to catch generation-time errors.
2449 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2452 hfta_count++; // for hfta file names with numeric suffixes
2454 hfta_list[i]->get_external_libs(extra_external_libs);
2458 string ext_lib_string;
2459 set<string>::iterator ssi_el;
2460 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2461 ext_lib_string += (*ssi_el)+" ";
2465 // Report on the set of operator views
2466 for(i=0;i<opviews.size();++i){
2467 opview_entry *opve = opviews.get_entry(i);
2468 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2469 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2470 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2471 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2472 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2474 if (!opve->liveness_timeout) {
2475 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2476 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2477 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2479 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2481 for(j=0;j<opve->subq_names.size();j++)
2482 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2483 fprintf(qtree_output,"\t</UDOP>\n");
2487 //-----------------------------------------------------------------
2489 // Create interface-specific meta code files.
2490 // first, open and parse the interface resources file.
2491 ifaces_db = new ifq_t();
2493 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2494 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2495 ifx_fname.c_str(), ierr.c_str());
2499 map<string, vector<stream_query *> >::iterator svsi;
2500 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2501 string lmach = (*svsi).first;
2503 // For this machine, create a set of lftas per interface.
2504 vector<stream_query *> mach_lftas = (*svsi).second;
2505 map<string, vector<stream_query *> > lfta_iface_lists;
2507 for(li=0;li<mach_lftas.size();++li){
2508 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2509 string lfta_iface = "_local_";
2511 string lfta_iface = tvec[0]->get_interface();
2513 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2516 map<string, vector<stream_query *> >::iterator lsvsi;
2517 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2519 string liface = (*lsvsi).first;
2520 vector<stream_query *> iface_lftas = (*lsvsi).second;
2521 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2522 if(iface_codegen_type.size()){
2523 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2525 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2528 string mcs = generate_nic_code(iface_lftas, nicprop);
2531 mcf_flnm = lmach + "_"+liface+".mcf";
2533 mcf_flnm = hostname + "_"+liface+".mcf";
2535 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2536 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2539 fprintf(mcf_fl,"%s",mcs.c_str());
2541 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2542 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2551 //-----------------------------------------------------------------
2554 // Find common filter predicates in the LFTAs.
2555 // in addition generate structs to store the temporal attributes unpacked by prefilter
2557 map<string, vector<stream_query *> >::iterator ssqi;
2558 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2560 string lmach = (*ssqi).first;
2561 bool packed_return = false;
2565 // The LFTAs of this machine.
2566 vector<stream_query *> mach_lftas = (*ssqi).second;
2567 // break up on a per-interface basis.
2568 map<string, vector<stream_query *> > lfta_iface_lists;
2569 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2571 for(li=0;li<mach_lftas.size();++li){
2572 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2573 string lfta_iface = "_local_";
2575 lfta_iface = tvec[0]->get_interface();
2577 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2578 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2582 // Are the return values "packed"?
2583 // This should be done on a per-interface basis.
2584 // But this is defunct code for gs-lite
2585 for(li=0;li<mach_lftas.size();++li){
2586 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2587 string liface = "_local_";
2589 liface = tvec[0]->get_interface();
2591 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2592 if(iface_codegen_type.size()){
2593 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2594 packed_return = true;
2600 // Separate lftas by interface, collect results on a per-interface basis.
2602 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2603 map<string, vector<cnf_set *> > prefilter_preds;
2604 set<unsigned int> pred_ids; // this can be global for all interfaces
2605 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2606 string liface = (*mvsi).first;
2607 vector<cnf_set *> empty_list;
2608 prefilter_preds[liface] = empty_list;
2609 if(! packed_return){
2610 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2613 // get NIC capabilities. (Is this needed?)
2614 nic_property *nicprop = NULL;
2615 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2616 if(iface_codegen_type.size()){
2617 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2619 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2626 // Now that we know the prefilter preds, generate the lfta code.
2627 // Do this for all lftas in this machine.
2628 for(li=0;li<mach_lftas.size();++li){
2629 set<unsigned int> subsumed_preds;
2630 set<unsigned int>::iterator sii;
2632 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2634 if((pid>>16) == li){
2635 subsumed_preds.insert(pid & 0xffff);
2639 string lfta_schema_str = mach_lftas[li]->make_schema();
2640 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2641 nic_property *nicprop = NULL; // no NIC properties?
2642 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2646 // generate structs to store the temporal attributes
2647 // unpacked by prefilter
2648 col_id_set temp_cids;
2649 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2650 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2652 // Compute the lfta bit signatures and the lfta colrefs
2653 // do this on a per-interface basis
2655 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2657 map<string, vector<long long int> > lfta_sigs; // used again later
2658 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2659 string liface = (*mvsi).first;
2660 vector<long long int> empty_list;
2661 lfta_sigs[liface] = empty_list;
2663 vector<col_id_set> lfta_cols;
2664 vector<int> lfta_snap_length;
2665 for(li=0;li<lfta_iface_lists[liface].size();++li){
2666 unsigned long long int mask=0, bpos=1;
2668 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2669 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2673 lfta_sigs[liface].push_back(mask);
2674 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2675 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2678 //for(li=0;li<mach_lftas.size();++li){
2679 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2680 //col_id_set::iterator tcisi;
2681 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2682 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2687 // generate the prefilter
2688 // Do this on a per-interface basis, except for the #define
2690 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2691 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2693 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2698 // Generate interface parameter lookup function
2699 lfta_val[lmach] += "// lookup interface properties by name\n";
2700 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2701 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2702 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2704 // collect a lit of interface names used by queries running on this host
2705 set<std::string> iface_names;
2706 for(i=0;i<mach_query_names[lmach].size();i++){
2707 int mi = mach_query_names[lmach][i];
2708 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2710 if(interface_names[mi]=="")
2711 iface_names.insert("DEFAULTDEV");
2713 iface_names.insert(interface_names[mi]);
2716 // generate interface property lookup code for every interface
2717 set<std::string>::iterator sir;
2718 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2719 if (sir == iface_names.begin())
2720 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2722 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2724 // iterate through interface properties
2725 vector<string> iface_properties;
2726 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2727 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2730 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2733 if (iface_properties.empty())
2734 lfta_val[lmach] += "\t\treturn NULL;\n";
2736 for (int i = 0; i < iface_properties.size(); ++i) {
2738 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2740 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2742 // combine all values for the interface property using comma separator
2743 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2744 lfta_val[lmach] += "\t\t\treturn \"";
2745 for (int j = 0; j < vals.size(); ++j) {
2746 lfta_val[lmach] += vals[j];
2747 if (j != vals.size()-1)
2748 lfta_val[lmach] += ",";
2750 lfta_val[lmach] += "\";\n";
2752 lfta_val[lmach] += "\t\t} else\n";
2753 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2756 lfta_val[lmach] += "\t} else\n";
2757 lfta_val[lmach] += "\t\treturn NULL;\n";
2758 lfta_val[lmach] += "}\n\n";
2761 // Generate a full list of FTAs for clearinghouse reference
2762 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2763 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2766 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2767 string liface = (*mvsi).first;
2768 if(liface != "_local_"){ // these don't register themselves
2769 vector<stream_query *> lfta_list = (*mvsi).second;
2770 for(i=0;i<lfta_list.size();i++){
2771 int mi = lfta_iface_qname_ix[liface][i];
2772 if(first) first = false;
2773 else lfta_val[lmach] += ", ";
2774 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2778 // for (i = 0; i < registration_query_names.size(); ++i) {
2780 // lfta_val[lmach] += ", ";
2781 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2784 for (i = 0; i < hfta_list.size(); ++i) {
2785 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2787 lfta_val[lmach] += ", NULL};\n\n";
2790 // Add the initialization function to lfta.c
2791 // Change to accept the interface name, and
2792 // set the prefilter function accordingly.
2793 // see the example in demo/err2
2794 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2795 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2797 // for(i=0;i<mach_query_names[lmach].size();i++)
2798 // int mi = mach_query_names[lmach][i];
2799 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2801 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2802 string liface = (*mvsi).first;
2803 vector<stream_query *> lfta_list = (*mvsi).second;
2804 for(i=0;i<lfta_list.size();i++){
2805 stream_query *lfta_sq = lfta_list[i];
2806 int mi = lfta_iface_qname_ix[liface][i];
2808 if(liface == "_local_"){
2809 // Don't register an init function, do the init code inline
2810 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2811 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2815 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2817 string this_iface = "DEFAULTDEV";
2818 if(interface_names[mi]!="")
2819 this_iface = '"'+interface_names[mi]+'"';
2820 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2821 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2822 // if(interface_names[mi]=="")
2823 // lfta_val[lmach]+="DEFAULTDEV";
2825 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2826 lfta_val[lmach] += this_iface;
2829 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2830 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2832 sprintf(tmpstr,",%d",snap_lengths[mi]);
2833 lfta_val[lmach] += tmpstr;
2835 // unsigned long long int mask=0, bpos=1;
2837 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2838 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2840 // bpos = bpos << 1;
2844 // sprintf(tmpstr,",%lluull",mask);
2845 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2846 lfta_val[lmach]+=tmpstr;
2848 lfta_val[lmach] += ",0ull";
2851 lfta_val[lmach] += ");\n";
2855 // End of lfta prefilter stuff
2856 // --------------------------------------------------
2858 // If there is a field verifier, warn about
2859 // lack of compatability
2860 string lfta_comment;
2862 string lfta_namespace;
2863 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2864 if(ldefs.count("comment")>0)
2865 lfta_comment = lfta_sq->defines["comment"];
2866 if(ldefs.count("Comment")>0)
2867 lfta_comment = lfta_sq->defines["Comment"];
2868 if(ldefs.count("COMMENT")>0)
2869 lfta_comment = lfta_sq->defines["COMMENT"];
2870 if(ldefs.count("title")>0)
2871 lfta_title = lfta_sq->defines["title"];
2872 if(ldefs.count("Title")>0)
2873 lfta_title = lfta_sq->defines["Title"];
2874 if(ldefs.count("TITLE")>0)
2875 lfta_title = lfta_sq->defines["TITLE"];
2876 if(ldefs.count("NAMESPACE")>0)
2877 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2878 if(ldefs.count("Namespace")>0)
2879 lfta_namespace = lfta_sq->defines["Namespace"];
2880 if(ldefs.count("namespace")>0)
2881 lfta_namespace = lfta_sq->defines["namespace"];
2883 string lfta_ht_size;
2884 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2885 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2886 if(ldefs.count("aggregate_slots")>0){
2887 lfta_ht_size = ldefs["aggregate_slots"];
2890 // NOTE : I'm assuming that visible lftas do not start with _fta.
2891 // -- will fail for non-visible simple selection queries.
2892 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2894 if(lfta_comment == "")
2895 warning_str += "\tcomment not found.\n";
2896 // Obsolete stuff that carsten wanted
2897 // if(lfta_title == "")
2898 // warning_str += "\ttitle not found.\n";
2899 // if(lfta_namespace == "")
2900 // warning_str += "\tnamespace not found.\n";
2902 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2904 for(fi=0;fi<flds.size();fi++){
2905 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2907 if(warning_str != "")
2908 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2909 registration_query_names[mi].c_str(),warning_str.c_str());
2913 // Create qtree output
2914 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2915 if(lfta_comment != "")
2916 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2917 if(lfta_title != "")
2918 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2919 if(lfta_namespace != "")
2920 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2921 if(lfta_ht_size != "")
2922 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2924 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2926 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2927 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2928 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2929 for(int t=0;t<itbls.size();++t){
2930 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2932 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2933 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2934 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2935 // write info about fields to qtree.xml
2936 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2938 for(fi=0;fi<flds.size();fi++){
2939 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2940 if(flds[fi]->get_modifier_list()->size()){
2941 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2943 fprintf(qtree_output," />\n");
2945 fprintf(qtree_output,"\t</LFTA>\n");
2951 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2952 string liface = (*mvsi).first;
2954 " if (!strcmp(device, \""+liface+"\")) \n"
2955 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2959 " if(lfta_prefilter == NULL){\n"
2960 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2967 lfta_val[lmach] += "}\n\n";
2969 if(!(debug_only || hfta_only) ){
2972 lfta_flnm = lmach + "_lfta.c";
2974 lfta_flnm = hostname + "_lfta.c";
2975 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2976 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2979 fprintf(lfta_out,"%s",lfta_header.c_str());
2980 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2981 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2986 // Say what are the operators which must execute
2987 if(opviews.size()>0)
2988 fprintf(stderr,"The queries use the following external operators:\n");
2989 for(i=0;i<opviews.size();++i){
2990 opview_entry *opv = opviews.get_entry(i);
2991 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2995 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2996 machine_names, schema_file_name,
2998 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
3001 fprintf(qtree_output,"</QueryNodes>\n");
3006 ////////////////////////////////////////////////////////////
3008 void generate_makefile(vector<string> &input_file_names, int nfiles,
3009 vector<string> &hfta_names, opview_set &opviews,
3010 vector<string> &machine_names,
3011 string schema_file_name,
3012 vector<string> &interface_names,
3013 ifq_t *ifdb, string &config_dir_path,
3016 map<string, vector<int> > &rts_hload
3020 if(config_dir_path != ""){
3021 config_dir_path = "-C "+config_dir_path;
3025 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3026 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3028 // if(libz_exists && !libast_exists){
3029 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3033 // Get set of operator executable files to run
3035 set<string>::iterator ssi;
3036 for(i=0;i<opviews.size();++i){
3037 opview_entry *opv = opviews.get_entry(i);
3038 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3041 FILE *outfl = fopen("Makefile", "w");
3043 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3048 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3049 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3053 fprintf(outfl," -DLFTA_STATS");
3055 // Gather the set of interfaces
3056 // Also, gather "base interface names" for use in computing
3057 // the hash splitting to virtual interfaces.
3058 // TODO : must update to hanndle machines
3060 set<string> base_vifaces; // base interfaces of virtual interfaces
3061 map<string, string> ifmachines;
3062 map<string, string> ifattrs;
3063 for(i=0;i<interface_names.size();++i){
3064 ifaces.insert(interface_names[i]);
3065 ifmachines[interface_names[i]] = machine_names[i];
3067 size_t Xpos = interface_names[i].find_last_of("X");
3068 if(Xpos!=string::npos){
3069 string iface = interface_names[i].substr(0,Xpos);
3070 base_vifaces.insert(iface);
3072 // get interface attributes and add them to the list
3075 // Do we need to include protobuf libraries?
3076 // TODO Move to the interface library: get the libraries to include
3077 // for an interface type
3079 bool use_proto = false;
3080 bool use_bsa = false;
3081 bool use_kafka = false;
3084 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3085 string ifnm = (*ssi);
3086 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3087 for(int ift_i=0;ift_i<ift.size();ift_i++){
3088 if(ift[ift_i]=="PROTO"){
3089 #ifdef PROTO_ENABLED
3092 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3097 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3098 for(int ift_i=0;ift_i<ift.size();ift_i++){
3099 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3103 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3108 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3109 for(int ift_i=0;ift_i<ift.size();ift_i++){
3110 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3111 #ifdef KAFKA_ENABLED
3114 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3125 for(i=0;i<hfta_names.size();++i)
3126 fprintf(outfl," %s",hfta_names[i].c_str());
3130 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3131 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3133 fprintf(outfl,"-L. ");
3135 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3137 fprintf(outfl,"-lgscppads -lpads ");
3139 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3141 fprintf(outfl, " -lpz -lz -lbz ");
3142 if(libz_exists && libast_exists)
3143 fprintf(outfl," -last ");
3145 fprintf(outfl, " -ldll -ldl ");
3147 #ifdef PROTO_ENABLED
3148 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3151 fprintf(outfl, " -lbsa_stream ");
3153 #ifdef KAFKA_ENABLED
3154 fprintf(outfl, " -lrdkafka ");
3156 fprintf(outfl," -lgscpaux");
3158 fprintf(outfl," -fprofile-arcs");
3163 "lfta.o: %s_lfta.c\n"
3164 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3166 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3167 for(i=0;i<nfiles;++i)
3168 fprintf(outfl," %s",input_file_names[i].c_str());
3170 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3172 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3174 for(i=0;i<nfiles;++i)
3175 fprintf(outfl," %s",input_file_names[i].c_str());
3176 fprintf(outfl,"\n");
3178 for(i=0;i<hfta_names.size();++i)
3181 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3184 "\t$(CPP) -o %s.o -c %s.cc\n"
3187 hfta_names[i].c_str(), hfta_names[i].c_str(),
3188 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3189 hfta_names[i].c_str(), hfta_names[i].c_str(),
3190 hfta_names[i].c_str(), hfta_names[i].c_str()
3195 "packet_schema.txt:\n"
3196 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3198 "external_fcns.def:\n"
3199 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3202 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3203 for(i=0;i<hfta_names.size();++i)
3204 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3205 fprintf(outfl,"\n");
3211 // Gather the set of interfaces
3212 // TODO : must update to hanndle machines
3213 // TODO : lookup interface attributes and add them as a parameter to rts process
3214 outfl = fopen("runit", "w");
3216 fprintf(stderr,"Can't open runit for write, exiting.\n");
3224 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3226 "if [ ! -f gshub.log ]\n"
3228 "\techo \"Failed to start bin/gshub.py\"\n"
3231 "ADDR=`cat gshub.log`\n"
3232 "ps opgid= $! >> gs.pids\n"
3233 "./rts $ADDR default ").c_str(), outfl);
3236 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3237 string ifnm = (*ssi);
3238 // suppress internal _local_ interface
3239 if (ifnm == "_local_")
3241 fprintf(outfl, "%s ",ifnm.c_str());
3242 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3243 for(j=0;j<ifv.size();++j)
3244 fprintf(outfl, "%s ",ifv[j].c_str());
3246 fprintf(outfl, " &\n");
3247 fprintf(outfl, "echo $! >> gs.pids\n");
3248 for(i=0;i<hfta_names.size();++i)
3249 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3251 for(j=0;j<opviews.opview_list.size();++j){
3252 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3256 system("chmod +x runit");
3258 outfl = fopen("stopit", "w");
3260 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3264 fprintf(outfl,"#!/bin/sh\n"
3266 "if [ ! -f gs.pids ]\n"
3270 "for pgid in `cat gs.pids`\n"
3272 "kill -TERM -$pgid\n"
3275 "for pgid in `cat gs.pids`\n"
3282 system("chmod +x stopit");
3284 //-----------------------------------------------
3286 /* For now disable support for virtual interfaces
3287 outfl = fopen("set_vinterface_hash.bat", "w");
3289 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3293 // The format should be determined by an entry in the ifres.xml file,
3294 // but for now hardcode the only example I have.
3295 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3296 if(rts_hload.count((*ssi))){
3297 string iface_name = (*ssi);
3298 string iface_number = "";
3299 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3300 if(isdigit(iface_name[j])){
3301 iface_number = iface_name[j];
3302 if(j>0 && isdigit(iface_name[j-1]))
3303 iface_number = iface_name[j-1] + iface_number;
3307 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3308 vector<int> halloc = rts_hload[iface_name];
3310 for(j=0;j<halloc.size();++j){
3313 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3314 prev_limit = halloc[j];
3316 fprintf(outfl,"\n");
3320 system("chmod +x set_vinterface_hash.bat");
3324 // Code for implementing a local schema
3326 table_list qpSchema;
3328 // Load the schemas of any LFTAs.
3330 for(l=0;l<hfta_nbr;++l){
3331 stream_query *sq0 = split_queries[l];
3332 table_def *td = sq0->get_output_tabledef();
3333 qpSchema.append_table(td);
3335 // load the schemas of any other ref'd tables.
3337 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3339 for(ti=0;ti<input_tbl_names.size();++ti){
3340 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3342 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3344 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3347 qpSchema.append_table(Schema->get_table(tbl_ref));
3352 // Functions related to parsing.
3355 static int split_string(char *instr,char sep, char **words,int max_words){
3361 words[nwords++] = str;
3362 while( (loc = strchr(str,sep)) != NULL){
3365 if(nwords >= max_words){
3366 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3367 nwords = max_words-1;
3369 words[nwords++] = str;