1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
69 // Interface to the field list verifier
70 field_list *field_verifier = NULL;
72 #define TMPSTRLEN 1000
75 #define PATH_DELIM '/'
78 char tmp_schema_str[10000];
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
85 // Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
88 // Interface to FTA definition lexer and parser ...
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
99 // Interface to external function lexer and parser ...
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
105 ext_fcn_list *Ext_fcns;
108 // Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
118 // forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120 vector<string> &hfta_names, opview_set &opviews,
121 vector<string> &machine_names,
122 string schema_file_name,
123 vector<string> &interface_names,
124 ifq_t *ifdb, string &config_dir_path,
127 map<string, vector<int> > &rts_hload
130 //static int split_string(char *instr,char sep, char **words,int max_words);
133 FILE *schema_summary_output = NULL; // query names
135 // Dump schema summary
136 void dump_summary(stream_query *str){
137 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
139 table_def *sch = str->get_output_tabledef();
141 vector<field_entry *> flds = sch->get_fields();
143 for(f=0;f<flds.size();++f){
144 if(f>0) fprintf(schema_summary_output,"|");
145 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
147 fprintf(schema_summary_output,"\n");
148 for(f=0;f<flds.size();++f){
149 if(f>0) fprintf(schema_summary_output,"|");
150 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
152 fprintf(schema_summary_output,"\n");
156 string hostname; // name of current host.
158 bool generate_stats = false;
159 string root_path = "../..";
162 int main(int argc, char **argv){
163 char tmpstr[TMPSTRLEN];
167 set<int>::iterator si;
169 vector<string> registration_query_names; // for lfta.c registration
170 map<string, vector<int> > mach_query_names; // list queries of machine
171 vector<int> snap_lengths; // for lfta.c registration
172 vector<string> interface_names; // for lfta.c registration
173 vector<string> machine_names; // machine of interface
174 vector<bool> lfta_reuse_options; // for lfta.c registration
175 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
176 vector<string> hfta_names; // hfta cource code names, for
177 // creating make file.
178 vector<string> qnames; // ensure unique names
179 map<string, int> lfta_names; // keep track of unique lftas.
182 // set these to 1 to debug the parser
184 Ext_fcnsParserdebug = 0;
186 FILE *lfta_out; // lfta.c output.
187 FILE *fta_in; // input file
188 FILE *table_schemas_in; // source tables definition file
189 FILE *query_name_output; // query names
190 FILE *qtree_output; // interconnections of query nodes
192 // -------------------------------
193 // Handling of Input Arguments
194 // -------------------------------
195 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
196 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
197 "\t[-B] : debug only (don't create output files)\n"
198 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
199 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
200 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
201 "\t[-C] : use <config directory> for definition files\n"
202 "\t[-l] : use <library directory> for library queries\n"
203 "\t[-N] : output query names in query_names.txt\n"
204 "\t[-H] : create HFTA only (no schema_file)\n"
205 "\t[-Q] : use query name for hfta suffix\n"
206 "\t[-M] : generate make file and runit, stopit scripts\n"
207 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
208 "\t[-f] : Output schema summary to schema_summary.txt\n"
209 "\t[-P] : link with PADS\n"
210 "\t[-h] : override host name.\n"
211 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
212 "\t[-R] : path to root of GS-lite\n"
215 // parameters gathered from command line processing
216 string external_fcns_path;
217 // string internal_fcn_path;
218 string config_dir_path;
219 string library_path = "./";
220 vector<string> input_file_names;
221 string schema_file_name;
222 bool debug_only = false;
223 bool hfta_only = false;
224 bool output_query_names = false;
225 bool output_schema_summary=false;
226 bool numeric_hfta_flname = true;
227 bool create_makefile = false;
228 bool distributed_mode = false;
229 bool partitioned_mode = false;
230 bool use_live_hosts_file = false;
231 bool use_pads = false;
232 bool clean_make = false;
233 int n_virtual_interfaces = 1;
236 while((chopt = getopt(argc,argv,optstr)) != -1){
242 distributed_mode = true;
245 partitioned_mode = true;
248 use_live_hosts_file = true;
252 config_dir_path = string(optarg) + string("/");
256 library_path = string(optarg) + string("/");
259 output_query_names = true;
262 numeric_hfta_flname = false;
265 if(schema_file_name == ""){
270 output_schema_summary=true;
273 create_makefile=true;
294 n_virtual_interfaces = atoi(optarg);
295 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
296 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
297 n_virtual_interfaces = 1;
302 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
303 fprintf(stderr,"%s\n", usage_str);
306 fprintf(stderr, "Argument was %c\n", optopt);
307 fprintf(stderr,"Invalid arguments\n");
308 fprintf(stderr,"%s\n", usage_str);
314 for (int i = 0; i < argc; ++i) {
315 if((schema_file_name == "") && !hfta_only){
316 schema_file_name = argv[i];
318 input_file_names.push_back(argv[i]);
322 if(input_file_names.size() == 0){
323 fprintf(stderr,"%s\n", usage_str);
328 string clean_cmd = "rm Makefile hfta_*.cc";
329 int clean_ret = system(clean_cmd.c_str());
331 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
336 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
338 // Open globally used file names.
340 // prepend config directory to schema file
341 schema_file_name = config_dir_path + schema_file_name;
342 external_fcns_path = config_dir_path + string("external_fcns.def");
343 string ifx_fname = config_dir_path + string("ifres.xml");
345 // Find interface query file(s).
347 gethostname(tmpstr,TMPSTRLEN);
350 hostname_len = strlen(tmpstr);
351 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
352 vector<string> ifq_fls;
354 ifq_fls.push_back(ifq_fname);
357 // Get the field list, if it exists
358 string flist_fl = config_dir_path + "field_list.xml";
360 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
361 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
362 xml_leaves = new xml_t();
363 xmlParser_setfileinput(flf_in);
364 if(xmlParserparse()){
365 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
367 field_verifier = new field_list(xml_leaves);
372 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
373 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
379 if(!(debug_only || hfta_only)){
380 if((lfta_out = fopen("lfta.c","w")) == NULL){
381 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
387 // Get the output specification file.
389 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
390 string ospec_fl = "output_spec.cfg";
392 vector<ospec_str *> output_specs;
393 multimap<string, int> qname_to_ospec;
394 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
397 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
399 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
401 // make operator type lowercase
403 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
404 *tmpc = tolower(*tmpc);
406 ospec_str *tmp_ospec = new ospec_str();
407 tmp_ospec->query = flds[0];
408 tmp_ospec->operator_type = flds[1];
409 tmp_ospec->operator_param = flds[2];
410 tmp_ospec->output_directory = flds[3];
411 tmp_ospec->bucketwidth = atoi(flds[4]);
412 tmp_ospec->partitioning_flds = flds[5];
413 tmp_ospec->n_partitions = atoi(flds[6]);
414 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
415 output_specs.push_back(tmp_ospec);
417 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
422 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
427 string pspec_fl = "hfta_parallelism.cfg";
429 map<string, int> hfta_parallelism;
430 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
433 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
434 bool good_entry = true;
436 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
438 string hname = flds[0];
439 int par = atoi(flds[1]);
440 if(par <= 0 || par > n_virtual_interfaces){
441 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444 if(good_entry && n_virtual_interfaces % par != 0){
445 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
449 hfta_parallelism[hname] = par;
453 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
457 // LFTA hash table sizes
458 string htspec_fl = "lfta_htsize.cfg";
459 FILE *htsp_in = NULL;
460 map<string, int> lfta_htsize;
461 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
464 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
465 bool good_entry = true;
467 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
469 string lfta_name = flds[0];
470 int htsz = atoi(flds[1]);
472 lfta_htsize[lfta_name] = htsz;
474 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
479 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
482 // LFTA vitual interface hash split
483 string rtlspec_fl = "rts_load.cfg";
485 map<string, vector<int> > rts_hload;
486 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
491 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
492 bool good_entry = true;
496 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
498 iface_name = flds[0];
501 for(j=1;j<nflds;++j){
502 int h = atoi(flds[j]);
506 hload.push_back(cumm_h);
512 rts_hload[iface_name] = hload;
514 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
521 if(output_query_names){
522 if((query_name_output = fopen("query_names.txt","w")) == NULL){
523 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
528 if(output_schema_summary){
529 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
530 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
535 if((qtree_output = fopen("qtree.xml","w")) == NULL){
536 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
539 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
540 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
541 fprintf(qtree_output,"<QueryNodes>\n");
544 // Get an initial Schema
547 // Parse the table schema definitions.
548 fta_parse_result = new fta_parse_t();
549 FtaParser_setfileinput(table_schemas_in);
550 if(FtaParserparse()){
551 fprintf(stderr,"Table schema parse failed.\n");
554 if(fta_parse_result->parse_type != TABLE_PARSE){
555 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
558 Schema = fta_parse_result->tables;
560 // Ensure that all schema_ids, if set, are distinct.
561 // Obsolete? There is code elsewhere to ensure that schema IDs are
562 // distinct on a per-interface basis.
566 for(int t=0;t<Schema->size();++t){
567 int sch_id = Schema->get_table(t)->get_schema_id();
569 if(found_ids.find(sch_id) != found_ids.end()){
570 dup_ids.insert(sch_id);
572 found_ids.insert(sch_id);
575 if(dup_ids.size()>0){
576 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
577 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
578 fprintf(stderr," %d",(*dit));
579 fprintf(stderr,"\n");
586 // Process schema field inheritance
588 retval = Schema->unroll_tables(err_str);
590 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
594 // hfta only => we will try to fetch schemas from the registry.
595 // therefore, start off with an empty schema.
596 Schema = new table_list();
600 // Open and parse the external functions file.
601 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
602 if(Ext_fcnsParserin == NULL){
603 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
604 Ext_fcns = new ext_fcn_list();
606 if(Ext_fcnsParserparse()){
607 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
608 Ext_fcns = new ext_fcn_list();
611 if(Ext_fcns->validate_fcns(err_str)){
612 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
616 // Open and parse the interface resources file.
617 // ifq_t *ifaces_db = new ifq_t();
619 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
620 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
621 // ifx_fname.c_str(), ierr.c_str());
624 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
625 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
626 // ifq_fname.c_str(), ierr.c_str());
631 // The LFTA code string.
632 // Put the standard preamble here.
633 // NOTE: the hash macros, fcns should go into the run time
634 map<string, string> lfta_val;
635 map<string, string> lfta_prefilter_val;
638 "#include <limits.h>\n"
639 "#include \"rts.h\"\n"
640 "#include \"fta.h\"\n"
641 "#include \"lapp.h\"\n"
642 "#include \"rts_udaf.h\"\n"
643 "#include<stdio.h>\n"
644 "#include <float.h>\n"
645 "#include \"rdtsc.h\"\n"
646 "#include \"watchlist.h\"\n\n"
649 // Get any locally defined parsing headers
651 memset(&glob_result, 0, sizeof(glob_result));
653 // do the glob operation TODO should be from GSROOT
654 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
655 if(return_value == 0){
657 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
659 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
660 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
664 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
668 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
669 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
670 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
671 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
676 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
678 "#define SLOT_FILLED 0x04\n"
679 "#define SLOT_GEN_BITS 0x03\n"
680 "#define SLOT_HASH_BITS 0xfffffff8\n"
681 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
682 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
683 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
686 "#define lfta_BOOL_to_hash(x) (x)\n"
687 "#define lfta_USHORT_to_hash(x) (x)\n"
688 "#define lfta_UINT_to_hash(x) (x)\n"
689 "#define lfta_IP_to_hash(x) (x)\n"
690 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
691 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
692 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
693 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
694 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
695 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
696 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
697 " for(i=0;i<x.length;++i){\n"
698 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
704 " if((i%4)!=0) ret ^=tmp_sum;\n"
710 //////////////////////////////////////////////////////////////////
711 ///// Get all of the query parse trees
715 int hfta_count = 0; // for numeric suffixes to hfta .cc files
717 //---------------------------
718 // Global info needed for post processing.
720 // Set of operator views ref'd in the query set.
722 // lftas on a per-machine basis.
723 map<string, vector<stream_query *> > lfta_mach_lists;
724 int nfiles = input_file_names.size();
725 vector<stream_query *> hfta_list; // list of hftas.
726 map<string, stream_query *> sq_map; // map from query name to stream query.
729 //////////////////////////////////////////
731 // Open and parse the interface resources file.
732 ifq_t *ifaces_db = new ifq_t();
734 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
735 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
736 ifx_fname.c_str(), ierr.c_str());
739 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
740 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
741 ifq_fls[0].c_str(), ierr.c_str());
745 map<string, string> qname_to_flname; // for detecting duplicate query names
749 // Parse the files to create a vector of parse trees.
750 // Load qnodes with information to perform a topo sort
751 // based on query dependencies.
752 vector<query_node *> qnodes; // for topo sort.
753 map<string,int> name_node_map; // map query name to qnodes entry
754 for(i=0;i<input_file_names.size();i++){
756 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
757 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
760 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
762 // Parse the FTA query
763 fta_parse_result = new fta_parse_t();
764 FtaParser_setfileinput(fta_in);
765 if(FtaParserparse()){
766 fprintf(stderr,"FTA parse failed.\n");
769 if(fta_parse_result->parse_type != QUERY_PARSE){
770 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
774 // returns a list of parse trees
775 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
776 for(p=0;p<qlist.size();++p){
777 table_exp_t *fta_parse_tree = qlist[p];
778 // query_parse_trees.push_back(fta_parse_tree);
780 // compute the default name -- extract from query name
781 strcpy(tmpstr,input_file_names[i].c_str());
782 char *qname = strrchr(tmpstr,PATH_DELIM);
787 char *qname_end = strchr(qname,'.');
788 if(qname_end != NULL) *qname_end = '\0';
789 string qname_str = qname;
790 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
792 // Deternmine visibility. Should I be attaching all of the output methods?
793 if(qname_to_ospec.count(imputed_qname)>0)
794 fta_parse_tree->set_visible(true);
796 fta_parse_tree->set_visible(false);
799 // Create a manipulable repesentation of the parse tree.
800 // the qnode inherits the visibility assigned to the parse tree.
801 int pos = qnodes.size();
802 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
803 name_node_map[ qnodes[pos]->name ] = pos;
804 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
805 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
806 // qfiles.push_back(i);
808 // Check for duplicate query names
809 // NOTE : in hfta-only generation, I should
810 // also check with the names of the registered queries.
811 if(qname_to_flname.count(qnodes[pos]->name) > 0){
812 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
813 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
816 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
817 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
818 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
821 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
827 // Add the library queries
830 for(pos=0;pos<qnodes.size();++pos){
832 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
833 string src_tbl = qnodes[pos]->refd_tbls[fi];
834 if(qname_to_flname.count(src_tbl) == 0){
835 int last_sep = src_tbl.find_last_of('/');
836 if(last_sep != string::npos){
837 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
838 string target_qname = src_tbl.substr(last_sep+1);
839 string qpathname = library_path + src_tbl + ".gsql";
840 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
841 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
843 fprintf(stderr,"After exit\n");
845 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
846 // Parse the FTA query
847 fta_parse_result = new fta_parse_t();
848 FtaParser_setfileinput(fta_in);
849 if(FtaParserparse()){
850 fprintf(stderr,"FTA parse failed.\n");
853 if(fta_parse_result->parse_type != QUERY_PARSE){
854 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
858 map<string, int> local_query_map;
859 vector<string> local_query_names;
860 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
861 for(p=0;p<qlist.size();++p){
862 table_exp_t *fta_parse_tree = qlist[p];
863 fta_parse_tree->set_visible(false); // assumed to not produce output
864 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
865 if(imputed_qname == target_qname)
866 imputed_qname = src_tbl;
867 if(local_query_map.count(imputed_qname)>0){
868 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
871 local_query_map[ imputed_qname ] = p;
872 local_query_names.push_back(imputed_qname);
875 if(local_query_map.count(src_tbl)==0){
876 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
880 vector<int> worklist;
881 set<int> added_queries;
882 vector<query_node *> new_qnodes;
883 worklist.push_back(local_query_map[target_qname]);
884 added_queries.insert(local_query_map[target_qname]);
886 int qpos = qnodes.size();
887 for(qq=0;qq<worklist.size();++qq){
888 int q_id = worklist[qq];
889 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
890 new_qnodes.push_back( new_qnode);
891 vector<string> refd_tbls = new_qnode->refd_tbls;
893 for(ff = 0;ff<refd_tbls.size();++ff){
894 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
896 if(name_node_map.count(refd_tbls[ff])>0){
897 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
900 worklist.push_back(local_query_map[refd_tbls[ff]]);
906 for(qq=0;qq<new_qnodes.size();++qq){
907 int qpos = qnodes.size();
908 qnodes.push_back(new_qnodes[qq]);
909 name_node_map[qnodes[qpos]->name ] = qpos;
910 qname_to_flname[qnodes[qpos]->name ] = qpathname;
924 //---------------------------------------
929 string udop_missing_sources;
930 for(i=0;i<qnodes.size();++i){
932 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
933 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
935 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
936 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
937 int pos = qnodes.size();
938 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
939 name_node_map[ qnodes[pos]->name ] = pos;
940 qnodes[pos]->is_externally_visible = false; // its visible
941 // Need to mark the source queries as visible.
943 string missing_sources = "";
944 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
945 string src_tbl = qnodes[pos]->refd_tbls[si];
946 if(name_node_map.count(src_tbl)==0){
947 missing_sources += src_tbl + " ";
950 if(missing_sources != ""){
951 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
958 if(udop_missing_sources != ""){
959 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
965 ////////////////////////////////////////////////////////////////////
966 /// Check parse trees to verify that some
967 /// global properties are met :
968 /// if q1 reads from q2, then
969 /// q2 is processed before q1
970 /// q1 can supply q2's parameters
971 /// Verify there is no cycle in the reads-from graph.
973 // Compute an order in which to process the
976 // Start by building the reads-from lists.
979 for(i=0;i<qnodes.size();++i){
981 vector<string> refd_tbls = qnodes[i]->refd_tbls;
982 for(fi = 0;fi<refd_tbls.size();++fi){
983 if(name_node_map.count(refd_tbls[fi])>0){
984 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
985 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
991 // If one query reads the result of another,
992 // check for parameter compatibility. Currently it must
993 // be an exact match. I will move to requiring
994 // containment after re-ordering, but will require
995 // some analysis for code generation which is not
997 //printf("There are %d query nodes.\n",qnodes.size());
1000 for(i=0;i<qnodes.size();++i){
1001 vector<var_pair_t *> target_params = qnodes[i]->params;
1002 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1003 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1004 if(target_params.size() != source_params.size()){
1005 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1009 for(p=0;p<target_params.size();++p){
1010 if(! (target_params[p]->name == source_params[p]->name &&
1011 target_params[p]->val == source_params[p]->val ) ){
1012 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1021 // Start by counting inedges.
1022 for(i=0;i<qnodes.size();++i){
1023 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1024 qnodes[(*si)]->n_consumers++;
1028 // The roots are the nodes with indegree zero.
1030 for(i=0;i<qnodes.size();++i){
1031 if(qnodes[i]->n_consumers == 0){
1032 if(qnodes[i]->is_externally_visible == false){
1033 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1039 // Remove the parts of the subtree that produce no output.
1040 set<int> valid_roots;
1041 set<int> discarded_nodes;
1042 set<int> candidates;
1043 while(roots.size() >0){
1044 for(si=roots.begin();si!=roots.end();++si){
1045 if(qnodes[(*si)]->is_externally_visible){
1046 valid_roots.insert((*si));
1048 discarded_nodes.insert((*si));
1049 set<int>::iterator sir;
1050 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1051 qnodes[(*sir)]->n_consumers--;
1052 if(qnodes[(*sir)]->n_consumers == 0)
1053 candidates.insert( (*sir));
1060 roots = valid_roots;
1061 if(discarded_nodes.size()>0){
1062 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1064 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1065 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1067 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1069 fprintf(stderr,"\n");
1072 // Compute the sources_to set, ignoring discarded nodes.
1073 for(i=0;i<qnodes.size();++i){
1074 if(discarded_nodes.count(i)==0)
1075 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1076 qnodes[(*si)]->sources_to.insert(i);
1081 // Find the nodes that are shared by multiple visible subtrees.
1082 // THe roots become inferred visible nodes.
1084 // Find the visible nodes.
1085 vector<int> visible_nodes;
1086 for(i=0;i<qnodes.size();i++){
1087 if(qnodes[i]->is_externally_visible){
1088 visible_nodes.push_back(i);
1092 // Find UDOPs referenced by visible nodes.
1094 for(i=0;i<visible_nodes.size();++i){
1095 workq.push_back(visible_nodes[i]);
1097 while(!workq.empty()){
1098 int node = workq.front();
1100 set<int>::iterator children;
1101 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1102 qnodes[node]->is_externally_visible = true;
1103 visible_nodes.push_back(node);
1104 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1105 if(qnodes[(*children)]->is_externally_visible == false){
1106 qnodes[(*children)]->is_externally_visible = true;
1107 visible_nodes.push_back((*children));
1111 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1112 workq.push_back((*children));
1119 for(i=0;i<qnodes.size();i++){
1120 qnodes[i]->subtree_roots.clear();
1123 // Walk the tree defined by a visible node, not descending into
1124 // subtrees rooted by a visible node. Mark the node visited with
1125 // the visible node ID.
1126 for(i=0;i<visible_nodes.size();++i){
1128 vroots.insert(visible_nodes[i]);
1129 while(vroots.size()>0){
1130 for(si=vroots.begin();si!=vroots.end();++si){
1131 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1133 set<int>::iterator sir;
1134 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1135 if(! qnodes[(*sir)]->is_externally_visible){
1136 candidates.insert( (*sir));
1140 vroots = candidates;
1144 // Find the nodes in multiple visible node subtrees, but with no parent
1145 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1146 done = true; // until proven otherwise
1147 for(i=0;i<qnodes.size();i++){
1148 if(qnodes[i]->subtree_roots.size()>1){
1149 bool is_new_root = true;
1150 set<int>::iterator sir;
1151 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1152 if(qnodes[(*sir)]->subtree_roots.size()>1)
1153 is_new_root = false;
1156 qnodes[i]->is_externally_visible = true;
1157 qnodes[i]->inferred_visible_node = true;
1158 visible_nodes.push_back(i);
1169 // get visible nodes in topo ordering.
1170 // for(i=0;i<qnodes.size();i++){
1171 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1173 vector<int> process_order;
1174 while(roots.size() >0){
1175 for(si=roots.begin();si!=roots.end();++si){
1176 if(discarded_nodes.count((*si))==0){
1177 process_order.push_back( (*si) );
1179 set<int>::iterator sir;
1180 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1181 qnodes[(*sir)]->n_consumers--;
1182 if(qnodes[(*sir)]->n_consumers == 0)
1183 candidates.insert( (*sir));
1191 //printf("process_order.size() =%d\n",process_order.size());
1193 // Search for cyclic dependencies
1195 for(i=0;i<qnodes.size();++i){
1196 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1197 if(found_dep.size() != 0) found_dep += ", ";
1198 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1201 if(found_dep.size()>0){
1202 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1206 // Get a list of query sets, in the order to be processed.
1207 // Start at visible root and do bfs.
1208 // The query set includes queries referenced indirectly,
1209 // as sources for user-defined operators. These are needed
1210 // to ensure that they are added to the schema, but are not part
1211 // of the query tree.
1213 // stream_node_sets contains queries reachable only through the
1214 // FROM clause, so I can tell which queries to add to the stream
1215 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1217 // NOTE: this code works because in order for data to be
1218 // read by multiple hftas, the node must be externally visible.
1219 // But visible nodes define roots of process sets.
1220 // internally visible nodes can feed data only
1221 // to other nodes in the same query file.
1222 // Therefore, any access can be restricted to a file,
1223 // hfta output sharing is done only on roots
1224 // never on interior nodes.
1229 // Conpute the base collection of hftas.
1230 vector<hfta_node *> hfta_sets;
1231 map<string, int> hfta_name_map;
1232 // vector< vector<int> > process_sets;
1233 // vector< set<int> > stream_node_sets;
1234 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1235 // i.e. process leaves 1st.
1236 for(i=0;i<process_order.size();++i){
1237 if(qnodes[process_order[i]]->is_externally_visible == true){
1238 //printf("Visible.\n");
1239 int root = process_order[i];
1240 hfta_node *hnode = new hfta_node();
1241 hnode->name = qnodes[root]-> name;
1242 hnode->source_name = qnodes[root]-> name;
1243 hnode->is_udop = qnodes[root]->is_udop;
1244 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1246 vector<int> proc_list; proc_list.push_back(root);
1247 // Ensure that nodes are added only once.
1248 set<int> proc_set; proc_set.insert(root);
1249 roots.clear(); roots.insert(root);
1251 while(roots.size()>0){
1252 for(si=roots.begin();si!=roots.end();++si){
1253 //printf("Processing root %d\n",(*si));
1254 set<int>::iterator sir;
1255 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1256 //printf("reads fom %d\n",(*sir));
1257 if(qnodes[(*sir)]->is_externally_visible==false){
1258 candidates.insert( (*sir) );
1259 if(proc_set.count( (*sir) )==0){
1260 proc_set.insert( (*sir) );
1261 proc_list.push_back( (*sir) );
1270 reverse(proc_list.begin(), proc_list.end());
1271 hnode->query_node_indices = proc_list;
1272 hfta_name_map[hnode->name] = hfta_sets.size();
1273 hfta_sets.push_back(hnode);
1277 // Compute the reads_from / sources_to graphs for the hftas.
1279 for(i=0;i<hfta_sets.size();++i){
1280 hfta_node *hnode = hfta_sets[i];
1281 for(q=0;q<hnode->query_node_indices.size();q++){
1282 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1283 for(s=0;s<qnode->refd_tbls.size();++s){
1284 if(hfta_name_map.count(qnode->refd_tbls[s])){
1285 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1286 hnode->reads_from.insert(other_hfta);
1287 hfta_sets[other_hfta]->sources_to.insert(i);
1293 // Compute a topological sort of the hfta_sets.
1295 vector<int> hfta_topsort;
1297 int hnode_srcs[hfta_sets.size()];
1298 for(i=0;i<hfta_sets.size();++i){
1300 if(hfta_sets[i]->sources_to.size() == 0)
1304 while(! workq.empty()){
1305 int node = workq.front();
1307 hfta_topsort.push_back(node);
1308 set<int>::iterator stsi;
1309 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1310 int parent = (*stsi);
1311 hnode_srcs[parent]++;
1312 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1313 workq.push_back(parent);
1318 // Decorate hfta nodes with the level of parallelism given as input.
1320 map<string, int>::iterator msii;
1321 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1322 string hfta_name = (*msii).first;
1323 int par = (*msii).second;
1324 if(hfta_name_map.count(hfta_name) > 0){
1325 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1327 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1331 // Propagate levels of parallelism: children should have a level of parallelism
1332 // as large as any of its parents. Adjust children upwards to compensate.
1333 // Start at parents and adjust children, auto-propagation will occur.
1335 for(i=hfta_sets.size()-1;i>=0;i--){
1336 set<int>::iterator stsi;
1337 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1338 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1339 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1344 // Before all the name mangling, check if therey are any output_spec.cfg
1345 // or hfta_parallelism.cfg entries that do not have a matching query.
1347 string dangling_ospecs = "";
1348 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1349 string oq = (*msii).first;
1350 if(hfta_name_map.count(oq) == 0){
1351 dangling_ospecs += " "+(*msii).first;
1354 if(dangling_ospecs!=""){
1355 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1358 string dangling_par = "";
1359 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1360 string oq = (*msii).first;
1361 if(hfta_name_map.count(oq) == 0){
1362 dangling_par += " "+(*msii).first;
1365 if(dangling_par!=""){
1366 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1371 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1372 // FROM clauses: retarget any name which is an internal node, and
1373 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1374 // when the source hfta has more parallelism than the target node.
1375 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1378 int n_original_hfta_sets = hfta_sets.size();
1379 for(i=0;i<n_original_hfta_sets;++i){
1380 if(hfta_sets[i]->n_parallel > 1){
1381 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1382 set<string> local_nodes; // names of query nodes in the hfta.
1383 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1384 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1387 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1388 string mangler = "__copy"+int_to_string(p);
1389 hfta_node *par_hfta = new hfta_node();
1390 par_hfta->name = hfta_sets[i]->name + mangler;
1391 par_hfta->source_name = hfta_sets[i]->name;
1392 par_hfta->is_udop = hfta_sets[i]->is_udop;
1393 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1394 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1395 par_hfta->parallel_idx = p;
1397 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1400 if(hfta_sets[i]->is_udop){
1401 int root = hfta_sets[i]->query_node_indices[0];
1403 string unequal_par_sources;
1404 set<int>::iterator rfsii;
1405 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1406 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1407 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1410 if(unequal_par_sources != ""){
1411 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1416 vector<string> new_sources;
1417 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1418 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1421 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1422 new_qn->name += mangler;
1423 new_qn->mangler = mangler;
1424 new_qn->refd_tbls = new_sources;
1425 par_hfta->query_node_indices.push_back(qnodes.size());
1426 par_qnode_map[new_qn->name] = qnodes.size();
1427 name_node_map[ new_qn->name ] = qnodes.size();
1428 qnodes.push_back(new_qn);
1430 // regular query node
1431 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1432 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1433 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1434 // rehome the from clause on mangled names.
1435 // create merge nodes as needed for external sources.
1436 for(f=0;f<dup_pt->fm->tlist.size();++f){
1437 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1438 dup_pt->fm->tlist[f]->schema_name += mangler;
1439 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1440 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1441 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1442 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1443 dup_pt->fm->tlist[f]->schema_name += mangler;
1445 vector<string> src_tbls;
1446 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1448 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1451 for(s=0;s<stride;++s){
1452 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1453 src_tbls.push_back(ext_src_name);
1455 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1456 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1457 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1458 // Make a qnode to represent the new merge node
1459 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1460 qn_pt->refd_tbls = src_tbls;
1461 qn_pt->is_udop = false;
1462 qn_pt->is_externally_visible = false;
1463 qn_pt->inferred_visible_node = false;
1464 par_hfta->query_node_indices.push_back(qnodes.size());
1465 par_qnode_map[merge_node_name] = qnodes.size();
1466 name_node_map[ merge_node_name ] = qnodes.size();
1467 qnodes.push_back(qn_pt);
1471 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1472 for(f=0;f<dup_pt->fm->tlist.size();++f){
1473 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1475 new_qn->params = qnodes[hqn_idx]->params;
1476 new_qn->is_udop = false;
1477 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1478 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1479 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1480 par_qnode_map[new_qn->name] = qnodes.size();
1481 name_node_map[ new_qn->name ] = qnodes.size();
1482 qnodes.push_back(new_qn);
1485 hfta_name_map[par_hfta->name] = hfta_sets.size();
1486 hfta_sets.push_back(par_hfta);
1489 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1491 if(!hfta_sets[i]->is_udop){
1492 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1493 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1494 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1495 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1496 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1497 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1498 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1499 vector<string> src_tbls;
1500 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1501 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1502 src_tbls.push_back(ext_src_name);
1504 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1505 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1506 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1507 // Make a qnode to represent the new merge node
1508 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1509 qn_pt->refd_tbls = src_tbls;
1510 qn_pt->is_udop = false;
1511 qn_pt->is_externally_visible = false;
1512 qn_pt->inferred_visible_node = false;
1513 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1514 name_node_map[ merge_node_name ] = qnodes.size();
1515 qnodes.push_back(qn_pt);
1524 // Rebuild the reads_from / sources_to lists in the qnodes
1525 for(q=0;q<qnodes.size();++q){
1526 qnodes[q]->reads_from.clear();
1527 qnodes[q]->sources_to.clear();
1529 for(q=0;q<qnodes.size();++q){
1530 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1531 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1532 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1533 qnodes[q]->reads_from.insert(rf);
1534 qnodes[rf]->sources_to.insert(q);
1539 // Rebuild the reads_from / sources_to lists in hfta_sets
1540 for(q=0;q<hfta_sets.size();++q){
1541 hfta_sets[q]->reads_from.clear();
1542 hfta_sets[q]->sources_to.clear();
1544 for(q=0;q<hfta_sets.size();++q){
1545 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1546 int node = hfta_sets[q]->query_node_indices[s];
1547 set<int>::iterator rfsii;
1548 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1549 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1550 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1551 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1558 for(q=0;q<qnodes.size();++q){
1559 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1560 set<int>::iterator rsii;
1561 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1562 printf(" %d",(*rsii));
1563 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1564 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1565 printf(" %d",(*rsii));
1569 for(q=0;q<hfta_sets.size();++q){
1570 if(hfta_sets[q]->do_generation==false)
1572 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1573 set<int>::iterator rsii;
1574 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1575 printf(" %d",(*rsii));
1576 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1577 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1578 printf(" %d",(*rsii));
1585 // Re-topo sort the hftas
1586 hfta_topsort.clear();
1588 int hnode_srcs_2[hfta_sets.size()];
1589 for(i=0;i<hfta_sets.size();++i){
1590 hnode_srcs_2[i] = 0;
1591 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1596 while(workq.empty() == false){
1597 int node = workq.front();
1599 hfta_topsort.push_back(node);
1600 set<int>::iterator stsii;
1601 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1602 int child = (*stsii);
1603 hnode_srcs_2[child]++;
1604 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1605 workq.push_back(child);
1610 // Ensure that all of the query_node_indices in hfta_sets are topologically
1611 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1612 for(i=0;i<hfta_sets.size();++i){
1613 if(hfta_sets[i]->do_generation){
1614 map<int,int> n_accounted;
1615 vector<int> new_order;
1617 vector<int>::iterator vii;
1618 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1619 n_accounted[(*vii)]= 0;
1621 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1622 set<int>::iterator rfsii;
1623 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1624 if(n_accounted.count((*rfsii)) == 0){
1625 n_accounted[(*vii)]++;
1628 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1629 workq.push_back((*vii));
1633 while(workq.empty() == false){
1634 int node = workq.front();
1636 new_order.push_back(node);
1637 set<int>::iterator stsii;
1638 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1639 if(n_accounted.count((*stsii))){
1640 n_accounted[(*stsii)]++;
1641 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1642 workq.push_back((*stsii));
1647 hfta_sets[i]->query_node_indices = new_order;
1655 /// Global checkng is done, start the analysis and translation
1656 /// of the query parse tree in the order specified by process_order
1659 // Get a list of the LFTAs for global lfta optimization
1660 // TODO: separate building operators from spliting lftas,
1661 // that will make optimizations such as predicate pushing easier.
1662 vector<stream_query *> lfta_list;
1663 stream_query *rootq;
1666 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1668 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1670 int hfta_id = hfta_topsort[qi];
1671 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1675 // Two possibilities, either its a UDOP, or its a collection of queries.
1676 // if(qnodes[curr_list.back()]->is_udop)
1677 if(hfta_sets[hfta_id]->is_udop){
1678 int node_id = curr_list.back();
1679 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1680 opview_entry *opv = new opview_entry();
1682 // Many of the UDOP properties aren't currently used.
1683 opv->parent_qname = "no_parent";
1684 opv->root_name = qnodes[node_id]->name;
1685 opv->view_name = qnodes[node_id]->file;
1687 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1688 opv->udop_alias = tmpstr;
1689 opv->mangler = qnodes[node_id]->mangler;
1691 if(opv->mangler != ""){
1692 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1693 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1696 // This piece of code makes each hfta which referes to the same udop
1697 // reference a distinct running udop. Do this at query optimization time?
1698 // fmtbl->set_udop_alias(opv->udop_alias);
1700 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1701 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1703 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1705 for(s=0;s<subq.size();++s){
1706 // Validate that the fields match.
1707 subquery_spec *sqs = subq[s];
1708 string subq_name = sqs->name + opv->mangler;
1709 vector<field_entry *> flds = Schema->get_fields(subq_name);
1710 if(flds.size() == 0){
1711 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1714 if(flds.size() < sqs->types.size()){
1715 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1718 bool failed = false;
1719 for(f=0;f<sqs->types.size();++f){
1720 data_type dte(sqs->types[f],sqs->modifiers[f]);
1721 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1722 if(! dte.subsumes_type(&dtf) ){
1723 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1727 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1728 string pstr = dte.get_temporal_string();
1729 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1736 /// Validation done, find the subquery, make a copy of the
1737 /// parse tree, and add it to the return list.
1738 for(q=0;q<qnodes.size();++q)
1739 if(qnodes[q]->name == subq_name)
1741 if(q==qnodes.size()){
1742 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1748 // Cross-link to from entry(s) in all sourced-to tables.
1749 set<int>::iterator sii;
1750 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1751 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1752 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1754 for(ii=0;ii<tblvars.size();++ii){
1755 if(tblvars[ii]->schema_name == opv->root_name){
1756 tblvars[ii]->set_opview_idx(opviews.size());
1762 opviews.append(opv);
1765 // Analyze the parse trees in this query,
1766 // put them in rootq
1767 // vector<int> curr_list = process_sets[qi];
1770 ////////////////////////////////////////
1773 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1774 for(qj=0;qj<curr_list.size();++qj){
1776 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1778 // Select the current query parse tree
1779 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1781 // if hfta only, try to fetch any missing schemas
1782 // from the registry (using the print_schema program).
1783 // Here I use a hack to avoid analyzing the query -- all referenced
1784 // tables must be in the from clause
1785 // If there is a problem loading any table, just issue a warning,
1787 tablevar_list_t *fm = fta_parse_tree->get_from();
1788 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1789 // iterate over all referenced tables
1791 for(t=0;t<refd_tbls.size();++t){
1792 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1794 if(tbl_ref < 0){ // if this table is not in the Schema
1797 string cmd="print_schema "+refd_tbls[t];
1798 FILE *schema_in = popen(cmd.c_str(), "r");
1799 if(schema_in == NULL){
1800 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1802 string schema_instr;
1803 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1804 schema_instr += tmpstr;
1806 fta_parse_result = new fta_parse_t();
1807 strcpy(tmp_schema_str,schema_instr.c_str());
1808 FtaParser_setstringinput(tmp_schema_str);
1809 if(FtaParserparse()){
1810 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1812 if( fta_parse_result->tables != NULL){
1814 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1815 Schema->add_table(fta_parse_result->tables->get_table(tl));
1818 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1823 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1831 // Analyze the query.
1832 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1834 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1838 stream_query new_sq(qs, Schema);
1839 if(new_sq.error_code){
1840 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1844 // Add it to the Schema
1845 table_def *output_td = new_sq.get_output_tabledef();
1846 Schema->add_table(output_td);
1848 // Create a query plan from the analyzed parse tree.
1849 // If its a query referneced via FROM, add it to the stream query.
1851 rootq->add_query(new_sq);
1853 rootq = new stream_query(new_sq);
1854 // have the stream query object inherit properties form the analyzed
1855 // hfta_node object.
1856 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1857 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1863 // This stream query has all its parts
1864 // Build and optimize it.
1865 //printf("translate_fta: generating plan.\n");
1866 if(rootq->generate_plan(Schema)){
1867 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1871 // If we've found the query plan head, so now add the output operators
1872 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1873 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1874 multimap<string, int>::iterator mmsi;
1875 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1876 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1877 rootq->add_output_operator(output_specs[(*mmsi).second]);
1883 // Perform query splitting if necessary.
1885 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1888 //for(l=0;l<split_queries.size();++l){
1889 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1895 if(split_queries.size() > 0){ // should be at least one component.
1897 // Compute the number of LFTAs.
1898 int n_lfta = split_queries.size();
1899 if(hfta_returned) n_lfta--;
1900 // Check if a schemaId constraint needs to be inserted.
1902 // Process the LFTA components.
1903 for(l=0;l<n_lfta;++l){
1904 if(lfta_names.count(split_queries[l]->query_name) == 0){
1905 // Grab the lfta for global optimization.
1906 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1907 string liface = "_local_";
1908 // string lmach = "";
1909 string lmach = hostname;
1911 liface = tvec[0]->get_interface(); // iface queries have been resolved
1912 if(tvec[0]->get_machine() != ""){
1913 lmach = tvec[0]->get_machine();
1915 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1918 interface_names.push_back(liface);
1919 machine_names.push_back(lmach);
1922 vector<predicate_t *> schemaid_preds;
1923 for(int irv=0;irv<tvec.size();++irv){
1925 string schema_name = tvec[irv]->get_schema_name();
1926 string rvar_name = tvec[irv]->get_var_name();
1927 int schema_ref = tvec[irv]->get_schema_ref();
1930 // interface_names.push_back(liface);
1931 // machine_names.push_back(lmach);
1933 //printf("Machine is %s\n",lmach.c_str());
1935 // Check if a schemaId constraint needs to be inserted.
1936 if(schema_ref<0){ // can result from some kinds of splits
1937 schema_ref = Schema->get_table_ref(schema_name);
1939 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1942 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1944 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1949 if(tvec[irv]->get_interface() != "_local_"){
1950 if(iface->has_multiple_schemas()){
1951 if(schema_id<0){ // invalid schema_id
1952 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1955 vector<string> iface_schemas = iface->get_property("Schemas");
1956 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1957 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1960 // Ensure that in liface, schema_id is used for only one schema
1961 if(schema_of_schemaid.count(liface)==0){
1962 map<int, string> empty_map;
1963 schema_of_schemaid[liface] = empty_map;
1965 if(schema_of_schemaid[liface].count(schema_id)==0){
1966 schema_of_schemaid[liface][schema_id] = schema_name;
1968 if(schema_of_schemaid[liface][schema_id] != schema_name){
1969 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1973 }else{ // single-schema interface
1974 schema_id = -1; // don't generate schema_id predicate
1975 vector<string> iface_schemas = iface->get_property("Schemas");
1976 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1977 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1980 if(iface_schemas.size()>1){
1981 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1989 // If we need to check the schema_id, insert a predicate into the lfta.
1990 // TODO not just schema_id, the full all_schema_ids set.
1992 colref_t *schid_cr = new colref_t("schemaId");
1993 schid_cr->schema_ref = schema_ref;
1994 schid_cr->table_name = rvar_name;
1995 schid_cr->tablevar_ref = 0;
1996 schid_cr->default_table = false;
1997 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1998 data_type *schid_dt = new data_type("uint");
1999 schid_se->dt = schid_dt;
2001 string schid_str = int_to_string(schema_id);
2002 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2003 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2004 lit_se->dt = schid_dt;
2006 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2007 vector<cnf_elem *> clist;
2008 make_cnf_from_pr(schid_pr, clist);
2009 analyze_cnf(clist[0]);
2010 clist[0]->cost = 1; // cheap one comparison
2011 // cnf built, now insert it.
2012 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2014 // Specialized processing
2015 // filter join, get two schemaid preds
2016 string node_type = split_queries[l]->query_plan[0]->node_type();
2017 if(node_type == "filter_join"){
2018 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2020 fj->pred_t0.push_back(clist[0]);
2022 fj->pred_t1.push_back(clist[0]);
2024 schemaid_preds.push_back(schid_pr);
2026 // watchlist join, get the first schemaid pred
2027 if(node_type == "watch_join"){
2028 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2030 fj->pred_t0.push_back(clist[0]);
2031 schemaid_preds.push_back(schid_pr);
2036 // Specialized processing, currently filter join.
2037 if(schemaid_preds.size()>1){
2038 string node_type = split_queries[l]->query_plan[0]->node_type();
2039 if(node_type == "filter_join"){
2040 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2041 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2042 vector<cnf_elem *> clist;
2043 make_cnf_from_pr(filter_pr, clist);
2044 analyze_cnf(clist[0]);
2045 clist[0]->cost = 1; // cheap one comparison
2046 fj->shared_pred.push_back(clist[0]);
2056 // Set the ht size from the recommendation, if there is one in the rec file
2057 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2058 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2062 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2063 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2064 lfta_list.push_back(split_queries[l]);
2065 lfta_mach_lists[lmach].push_back(split_queries[l]);
2067 // THe following is a hack,
2068 // as I should be generating LFTA code through
2069 // the stream_query object.
2071 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2073 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2076 // Create query description to embed in lfta.c
2077 string lfta_schema_str = split_queries[l]->make_schema();
2078 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2080 // get NIC capabilities.
2082 nic_property *nicprop = NULL;
2083 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2084 if(iface_codegen_type.size()){
2085 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2087 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2092 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2095 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2097 // TODO NOTE : I'd like it to be the case that registration_query_names
2098 // are the queries to be registered for subsciption.
2099 // but there is some complex bookkeeping here.
2100 registration_query_names.push_back(split_queries[l]->query_name);
2101 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2102 // NOTE: I will assume a 1-1 correspondance between
2103 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2104 // where mach_query_names[lmach][i] contains the index into
2105 // query_names, which names the lfta, and
2106 // mach_query_names[lmach][i] is the stream_query * of the
2107 // corresponding lfta.
2108 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2112 // check if lfta is reusable
2113 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2115 bool lfta_reusable = false;
2116 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2117 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2118 lfta_reusable = true;
2120 lfta_reuse_options.push_back(lfta_reusable);
2122 // LFTA will inherit the liveness timeout specification from the containing query
2123 // it is too conservative as lfta are expected to spend less time per tuple
2126 // extract liveness timeout from query definition
2127 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2128 if (!liveness_timeout) {
2129 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2130 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2131 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2133 lfta_liveness_timeouts.push_back(liveness_timeout);
2135 // Add it to the schema
2136 table_def *td = split_queries[l]->get_output_tabledef();
2137 Schema->append_table(td);
2138 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2143 // If the output is lfta-only, dump out the query name.
2144 if(split_queries.size() == 1 && !hfta_returned){
2145 if(output_query_names ){
2146 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2150 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2155 // output schema summary
2156 if(output_schema_summary){
2157 dump_summary(split_queries[0]);
2163 if(hfta_returned){ // query also has an HFTA component
2164 int hfta_nbr = split_queries.size()-1;
2166 hfta_list.push_back(split_queries[hfta_nbr]);
2168 // report on generated query names
2169 if(output_query_names){
2170 string hfta_name =split_queries[hfta_nbr]->query_name;
2171 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2172 for(l=0;l<hfta_nbr;++l){
2173 string lfta_name =split_queries[l]->query_name;
2174 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2178 // fprintf(stderr,"query names are ");
2179 // for(l=0;l<hfta_nbr;++l){
2180 // if(l>0) fprintf(stderr,",");
2181 // string fta_name =split_queries[l]->query_name;
2182 // fprintf(stderr," %s",fta_name.c_str());
2184 // fprintf(stderr,"\n");
2189 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2190 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2197 //-----------------------------------------------------------------
2198 // Compute and propagate the SE in PROTOCOL fields compute a field.
2199 //-----------------------------------------------------------------
2201 for(i=0;i<lfta_list.size();i++){
2202 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2203 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2205 for(i=0;i<hfta_list.size();i++){
2206 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2207 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2212 //------------------------------------------------------------------------
2213 // Perform individual FTA optimizations
2214 //-----------------------------------------------------------------------
2216 if (partitioned_mode) {
2218 // open partition definition file
2219 string part_fname = config_dir_path + "partition.txt";
2221 FILE* partfd = fopen(part_fname.c_str(), "r");
2223 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2226 PartnParser_setfileinput(partfd);
2227 if (PartnParserparse()) {
2228 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2235 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2237 int num_hfta = hfta_list.size();
2238 for(i=0; i < hfta_list.size(); ++i){
2239 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2242 // Add all new hftas to schema
2243 for(i=num_hfta; i < hfta_list.size(); ++i){
2244 table_def *td = hfta_list[i]->get_output_tabledef();
2245 Schema->append_table(td);
2248 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2252 //------------------------------------------------------------------------
2253 // Do global (cross-fta) optimization
2254 //-----------------------------------------------------------------------
2261 set<string> extra_external_libs;
2263 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2266 // build hfta file name, create output
2267 if(numeric_hfta_flname){
2268 sprintf(tmpstr,"hfta_%d",hfta_count);
2269 hfta_names.push_back(tmpstr);
2270 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2272 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2273 hfta_names.push_back(tmpstr);
2274 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2276 FILE *hfta_fl = fopen(tmpstr,"w");
2277 if(hfta_fl == NULL){
2278 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2281 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2283 // If there is a field verifier, warn about
2284 // lack of compatability
2285 // NOTE : this code assumes that visible non-lfta queries
2286 // are those at the root of a stream query.
2287 string hfta_comment;
2289 string hfta_namespace;
2290 if(hfta_list[i]->defines.count("comment")>0)
2291 hfta_comment = hfta_list[i]->defines["comment"];
2292 if(hfta_list[i]->defines.count("Comment")>0)
2293 hfta_comment = hfta_list[i]->defines["Comment"];
2294 if(hfta_list[i]->defines.count("COMMENT")>0)
2295 hfta_comment = hfta_list[i]->defines["COMMENT"];
2296 if(hfta_list[i]->defines.count("title")>0)
2297 hfta_title = hfta_list[i]->defines["title"];
2298 if(hfta_list[i]->defines.count("Title")>0)
2299 hfta_title = hfta_list[i]->defines["Title"];
2300 if(hfta_list[i]->defines.count("TITLE")>0)
2301 hfta_title = hfta_list[i]->defines["TITLE"];
2302 if(hfta_list[i]->defines.count("namespace")>0)
2303 hfta_namespace = hfta_list[i]->defines["namespace"];
2304 if(hfta_list[i]->defines.count("Namespace")>0)
2305 hfta_namespace = hfta_list[i]->defines["Namespace"];
2306 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2307 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2309 if(field_verifier != NULL){
2311 if(hfta_comment == "")
2312 warning_str += "\tcomment not found.\n";
2314 // Obsolete stuff that Carsten wanted
2315 // if(hfta_title == "")
2316 // warning_str += "\ttitle not found.\n";
2317 // if(hfta_namespace == "")
2318 // warning_str += "\tnamespace not found.\n";
2321 // There is a get_tbl_keys method implemented for qp_nodes,
2322 // integrate it into steam_query, then call it to find keys,
2323 // and annotate feidls with their key-ness.
2324 // If there is a "keys" proprty in the defines block, override anything returned
2325 // from the automated analysis
2327 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2329 for(fi=0;fi<flds.size();fi++){
2330 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2332 if(warning_str != "")
2333 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2334 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2337 // Get the fields in this query
2338 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2340 // do key processing
2341 string hfta_keys_s = "";
2342 if(hfta_list[i]->defines.count("keys")>0)
2343 hfta_keys_s = hfta_list[i]->defines["keys"];
2344 if(hfta_list[i]->defines.count("Keys")>0)
2345 hfta_keys_s = hfta_list[i]->defines["Keys"];
2346 if(hfta_list[i]->defines.count("KEYS")>0)
2347 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2348 string xtra_keys_s = "";
2349 if(hfta_list[i]->defines.count("extra_keys")>0)
2350 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2351 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2352 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2353 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2354 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2356 vector<string> hfta_keys;
2357 vector<string> partial_keys;
2358 vector<string> xtra_keys;
2359 if(hfta_keys_s==""){
2360 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2361 if(xtra_keys_s.size()>0){
2362 xtra_keys = split_string(xtra_keys_s, ',');
2364 for(int xi=0;xi<xtra_keys.size();++xi){
2365 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2366 hfta_keys.push_back(xtra_keys[xi]);
2370 hfta_keys = split_string(hfta_keys_s, ',');
2372 // validate that all of the keys exist in the output.
2373 // (exit on error, as its a bad specificiation)
2374 vector<string> missing_keys;
2375 for(int ki=0;ki<hfta_keys.size(); ++ki){
2377 for(fi=0;fi<flds.size();++fi){
2378 if(hfta_keys[ki] == flds[fi]->get_name())
2382 missing_keys.push_back(hfta_keys[ki]);
2384 if(missing_keys.size()>0){
2385 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2386 for(int hi=0; hi<missing_keys.size(); ++hi){
2387 fprintf(stderr," %s", missing_keys[hi].c_str());
2389 fprintf(stderr,"\n");
2393 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2394 if(hfta_comment != "")
2395 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2396 if(hfta_title != "")
2397 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2398 if(hfta_namespace != "")
2399 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2400 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2401 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2403 // write info about fields to qtree.xml
2405 for(fi=0;fi<flds.size();fi++){
2406 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2407 if(flds[fi]->get_modifier_list()->size()){
2408 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2410 fprintf(qtree_output," />\n");
2413 for(int hi=0;hi<hfta_keys.size();++hi){
2414 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2416 for(int hi=0;hi<partial_keys.size();++hi){
2417 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2419 for(int hi=0;hi<xtra_keys.size();++hi){
2420 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2424 // extract liveness timeout from query definition
2425 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2426 if (!liveness_timeout) {
2427 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2428 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2429 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2431 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2433 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2435 for(itv=0;itv<tmp_tv.size();++itv){
2436 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2438 string ifrs = hfta_list[i]->collect_refd_ifaces();
2440 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2442 fprintf(qtree_output,"\t</HFTA>\n");
2446 // debug only -- do code generation to catch generation-time errors.
2447 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2450 hfta_count++; // for hfta file names with numeric suffixes
2452 hfta_list[i]->get_external_libs(extra_external_libs);
2456 string ext_lib_string;
2457 set<string>::iterator ssi_el;
2458 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2459 ext_lib_string += (*ssi_el)+" ";
2463 // Report on the set of operator views
2464 for(i=0;i<opviews.size();++i){
2465 opview_entry *opve = opviews.get_entry(i);
2466 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2467 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2468 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2469 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2470 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2472 if (!opve->liveness_timeout) {
2473 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2474 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2475 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2477 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2479 for(j=0;j<opve->subq_names.size();j++)
2480 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2481 fprintf(qtree_output,"\t</UDOP>\n");
2485 //-----------------------------------------------------------------
2487 // Create interface-specific meta code files.
2488 // first, open and parse the interface resources file.
2489 ifaces_db = new ifq_t();
2491 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2492 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2493 ifx_fname.c_str(), ierr.c_str());
2497 map<string, vector<stream_query *> >::iterator svsi;
2498 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2499 string lmach = (*svsi).first;
2501 // For this machine, create a set of lftas per interface.
2502 vector<stream_query *> mach_lftas = (*svsi).second;
2503 map<string, vector<stream_query *> > lfta_iface_lists;
2505 for(li=0;li<mach_lftas.size();++li){
2506 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2507 string lfta_iface = "_local_";
2509 string lfta_iface = tvec[0]->get_interface();
2511 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2514 map<string, vector<stream_query *> >::iterator lsvsi;
2515 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2517 string liface = (*lsvsi).first;
2518 vector<stream_query *> iface_lftas = (*lsvsi).second;
2519 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2520 if(iface_codegen_type.size()){
2521 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2523 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2526 string mcs = generate_nic_code(iface_lftas, nicprop);
2529 mcf_flnm = lmach + "_"+liface+".mcf";
2531 mcf_flnm = hostname + "_"+liface+".mcf";
2533 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2534 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2537 fprintf(mcf_fl,"%s",mcs.c_str());
2539 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2540 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2549 //-----------------------------------------------------------------
2552 // Find common filter predicates in the LFTAs.
2553 // in addition generate structs to store the temporal attributes unpacked by prefilter
2555 map<string, vector<stream_query *> >::iterator ssqi;
2556 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2558 string lmach = (*ssqi).first;
2559 bool packed_return = false;
2563 // The LFTAs of this machine.
2564 vector<stream_query *> mach_lftas = (*ssqi).second;
2565 // break up on a per-interface basis.
2566 map<string, vector<stream_query *> > lfta_iface_lists;
2567 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2569 for(li=0;li<mach_lftas.size();++li){
2570 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2571 string lfta_iface = "_local_";
2573 lfta_iface = tvec[0]->get_interface();
2575 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2576 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2580 // Are the return values "packed"?
2581 // This should be done on a per-interface basis.
2582 // But this is defunct code for gs-lite
2583 for(li=0;li<mach_lftas.size();++li){
2584 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2585 string liface = "_local_";
2587 liface = tvec[0]->get_interface();
2589 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2590 if(iface_codegen_type.size()){
2591 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2592 packed_return = true;
2598 // Separate lftas by interface, collect results on a per-interface basis.
2600 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2601 map<string, vector<cnf_set *> > prefilter_preds;
2602 set<unsigned int> pred_ids; // this can be global for all interfaces
2603 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2604 string liface = (*mvsi).first;
2605 vector<cnf_set *> empty_list;
2606 prefilter_preds[liface] = empty_list;
2607 if(! packed_return){
2608 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2611 // get NIC capabilities. (Is this needed?)
2612 nic_property *nicprop = NULL;
2613 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2614 if(iface_codegen_type.size()){
2615 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2617 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2624 // Now that we know the prefilter preds, generate the lfta code.
2625 // Do this for all lftas in this machine.
2626 for(li=0;li<mach_lftas.size();++li){
2627 set<unsigned int> subsumed_preds;
2628 set<unsigned int>::iterator sii;
2630 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2632 if((pid>>16) == li){
2633 subsumed_preds.insert(pid & 0xffff);
2637 string lfta_schema_str = mach_lftas[li]->make_schema();
2638 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2639 nic_property *nicprop = NULL; // no NIC properties?
2640 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2644 // generate structs to store the temporal attributes
2645 // unpacked by prefilter
2646 col_id_set temp_cids;
2647 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2648 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2650 // Compute the lfta bit signatures and the lfta colrefs
2651 // do this on a per-interface basis
2653 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2655 map<string, vector<long long int> > lfta_sigs; // used again later
2656 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2657 string liface = (*mvsi).first;
2658 vector<long long int> empty_list;
2659 lfta_sigs[liface] = empty_list;
2661 vector<col_id_set> lfta_cols;
2662 vector<int> lfta_snap_length;
2663 for(li=0;li<lfta_iface_lists[liface].size();++li){
2664 unsigned long long int mask=0, bpos=1;
2666 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2667 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2671 lfta_sigs[liface].push_back(mask);
2672 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2673 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2676 //for(li=0;li<mach_lftas.size();++li){
2677 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2678 //col_id_set::iterator tcisi;
2679 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2680 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2685 // generate the prefilter
2686 // Do this on a per-interface basis, except for the #define
2688 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2689 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2691 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2696 // Generate interface parameter lookup function
2697 lfta_val[lmach] += "// lookup interface properties by name\n";
2698 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2699 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2700 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2702 // collect a lit of interface names used by queries running on this host
2703 set<std::string> iface_names;
2704 for(i=0;i<mach_query_names[lmach].size();i++){
2705 int mi = mach_query_names[lmach][i];
2706 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2708 if(interface_names[mi]=="")
2709 iface_names.insert("DEFAULTDEV");
2711 iface_names.insert(interface_names[mi]);
2714 // generate interface property lookup code for every interface
2715 set<std::string>::iterator sir;
2716 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2717 if (sir == iface_names.begin())
2718 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2720 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2722 // iterate through interface properties
2723 vector<string> iface_properties;
2724 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2725 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2728 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2731 if (iface_properties.empty())
2732 lfta_val[lmach] += "\t\treturn NULL;\n";
2734 for (int i = 0; i < iface_properties.size(); ++i) {
2736 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2738 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2740 // combine all values for the interface property using comma separator
2741 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2742 lfta_val[lmach] += "\t\t\treturn \"";
2743 for (int j = 0; j < vals.size(); ++j) {
2744 lfta_val[lmach] += vals[j];
2745 if (j != vals.size()-1)
2746 lfta_val[lmach] += ",";
2748 lfta_val[lmach] += "\";\n";
2750 lfta_val[lmach] += "\t\t} else\n";
2751 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2754 lfta_val[lmach] += "\t} else\n";
2755 lfta_val[lmach] += "\t\treturn NULL;\n";
2756 lfta_val[lmach] += "}\n\n";
2759 // Generate a full list of FTAs for clearinghouse reference
2760 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2761 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2764 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2765 string liface = (*mvsi).first;
2766 if(liface != "_local_"){ // these don't register themselves
2767 vector<stream_query *> lfta_list = (*mvsi).second;
2768 for(i=0;i<lfta_list.size();i++){
2769 int mi = lfta_iface_qname_ix[liface][i];
2770 if(first) first = false;
2771 else lfta_val[lmach] += ", ";
2772 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2776 // for (i = 0; i < registration_query_names.size(); ++i) {
2778 // lfta_val[lmach] += ", ";
2779 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2782 for (i = 0; i < hfta_list.size(); ++i) {
2783 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2785 lfta_val[lmach] += ", NULL};\n\n";
2788 // Add the initialization function to lfta.c
2789 // Change to accept the interface name, and
2790 // set the prefilter function accordingly.
2791 // see the example in demo/err2
2792 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2793 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2795 // for(i=0;i<mach_query_names[lmach].size();i++)
2796 // int mi = mach_query_names[lmach][i];
2797 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2799 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2800 string liface = (*mvsi).first;
2801 vector<stream_query *> lfta_list = (*mvsi).second;
2802 for(i=0;i<lfta_list.size();i++){
2803 stream_query *lfta_sq = lfta_list[i];
2804 int mi = lfta_iface_qname_ix[liface][i];
2806 if(liface == "_local_"){
2807 // Don't register an init function, do the init code inline
2808 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2809 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2813 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2815 string this_iface = "DEFAULTDEV";
2816 if(interface_names[mi]!="")
2817 this_iface = '"'+interface_names[mi]+'"';
2818 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2819 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2820 // if(interface_names[mi]=="")
2821 // lfta_val[lmach]+="DEFAULTDEV";
2823 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2824 lfta_val[lmach] += this_iface;
2827 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2828 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2830 sprintf(tmpstr,",%d",snap_lengths[mi]);
2831 lfta_val[lmach] += tmpstr;
2833 // unsigned long long int mask=0, bpos=1;
2835 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2836 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2838 // bpos = bpos << 1;
2842 // sprintf(tmpstr,",%lluull",mask);
2843 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2844 lfta_val[lmach]+=tmpstr;
2846 lfta_val[lmach] += ",0ull";
2849 lfta_val[lmach] += ");\n";
2853 // End of lfta prefilter stuff
2854 // --------------------------------------------------
2856 // If there is a field verifier, warn about
2857 // lack of compatability
2858 string lfta_comment;
2860 string lfta_namespace;
2861 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2862 if(ldefs.count("comment")>0)
2863 lfta_comment = lfta_sq->defines["comment"];
2864 if(ldefs.count("Comment")>0)
2865 lfta_comment = lfta_sq->defines["Comment"];
2866 if(ldefs.count("COMMENT")>0)
2867 lfta_comment = lfta_sq->defines["COMMENT"];
2868 if(ldefs.count("title")>0)
2869 lfta_title = lfta_sq->defines["title"];
2870 if(ldefs.count("Title")>0)
2871 lfta_title = lfta_sq->defines["Title"];
2872 if(ldefs.count("TITLE")>0)
2873 lfta_title = lfta_sq->defines["TITLE"];
2874 if(ldefs.count("NAMESPACE")>0)
2875 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2876 if(ldefs.count("Namespace")>0)
2877 lfta_namespace = lfta_sq->defines["Namespace"];
2878 if(ldefs.count("namespace")>0)
2879 lfta_namespace = lfta_sq->defines["namespace"];
2881 string lfta_ht_size;
2882 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2883 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2884 if(ldefs.count("aggregate_slots")>0){
2885 lfta_ht_size = ldefs["aggregate_slots"];
2888 // NOTE : I'm assuming that visible lftas do not start with _fta.
2889 // -- will fail for non-visible simple selection queries.
2890 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2892 if(lfta_comment == "")
2893 warning_str += "\tcomment not found.\n";
2894 // Obsolete stuff that carsten wanted
2895 // if(lfta_title == "")
2896 // warning_str += "\ttitle not found.\n";
2897 // if(lfta_namespace == "")
2898 // warning_str += "\tnamespace not found.\n";
2900 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2902 for(fi=0;fi<flds.size();fi++){
2903 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2905 if(warning_str != "")
2906 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2907 registration_query_names[mi].c_str(),warning_str.c_str());
2911 // Create qtree output
2912 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2913 if(lfta_comment != "")
2914 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2915 if(lfta_title != "")
2916 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2917 if(lfta_namespace != "")
2918 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2919 if(lfta_ht_size != "")
2920 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2922 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2924 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2925 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2926 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2927 for(int t=0;t<itbls.size();++t){
2928 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2930 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2931 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2932 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2933 // write info about fields to qtree.xml
2934 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2936 for(fi=0;fi<flds.size();fi++){
2937 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2938 if(flds[fi]->get_modifier_list()->size()){
2939 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2941 fprintf(qtree_output," />\n");
2943 fprintf(qtree_output,"\t</LFTA>\n");
2949 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2950 string liface = (*mvsi).first;
2952 " if (!strcmp(device, \""+liface+"\")) \n"
2953 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2957 " if(lfta_prefilter == NULL){\n"
2958 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2965 lfta_val[lmach] += "}\n\n";
2967 if(!(debug_only || hfta_only) ){
2970 lfta_flnm = lmach + "_lfta.c";
2972 lfta_flnm = hostname + "_lfta.c";
2973 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2974 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2977 fprintf(lfta_out,"%s",lfta_header.c_str());
2978 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2979 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2984 // Say what are the operators which must execute
2985 if(opviews.size()>0)
2986 fprintf(stderr,"The queries use the following external operators:\n");
2987 for(i=0;i<opviews.size();++i){
2988 opview_entry *opv = opviews.get_entry(i);
2989 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2993 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2994 machine_names, schema_file_name,
2996 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2999 fprintf(qtree_output,"</QueryNodes>\n");
3004 ////////////////////////////////////////////////////////////
3006 void generate_makefile(vector<string> &input_file_names, int nfiles,
3007 vector<string> &hfta_names, opview_set &opviews,
3008 vector<string> &machine_names,
3009 string schema_file_name,
3010 vector<string> &interface_names,
3011 ifq_t *ifdb, string &config_dir_path,
3014 map<string, vector<int> > &rts_hload
3018 if(config_dir_path != ""){
3019 config_dir_path = "-C "+config_dir_path;
3023 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3024 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3026 // if(libz_exists && !libast_exists){
3027 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3031 // Get set of operator executable files to run
3033 set<string>::iterator ssi;
3034 for(i=0;i<opviews.size();++i){
3035 opview_entry *opv = opviews.get_entry(i);
3036 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3039 FILE *outfl = fopen("Makefile", "w");
3041 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3046 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3047 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3051 fprintf(outfl," -DLFTA_STATS");
3053 // Gather the set of interfaces
3054 // Also, gather "base interface names" for use in computing
3055 // the hash splitting to virtual interfaces.
3056 // TODO : must update to hanndle machines
3058 set<string> base_vifaces; // base interfaces of virtual interfaces
3059 map<string, string> ifmachines;
3060 map<string, string> ifattrs;
3061 for(i=0;i<interface_names.size();++i){
3062 ifaces.insert(interface_names[i]);
3063 ifmachines[interface_names[i]] = machine_names[i];
3065 size_t Xpos = interface_names[i].find_last_of("X");
3066 if(Xpos!=string::npos){
3067 string iface = interface_names[i].substr(0,Xpos);
3068 base_vifaces.insert(iface);
3070 // get interface attributes and add them to the list
3073 // Do we need to include protobuf libraries?
3074 // TODO Move to the interface library: get the libraries to include
3075 // for an interface type
3077 bool use_proto = false;
3078 bool use_bsa = false;
3079 bool use_kafka = false;
3082 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3083 string ifnm = (*ssi);
3084 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3085 for(int ift_i=0;ift_i<ift.size();ift_i++){
3086 if(ift[ift_i]=="PROTO"){
3087 #ifdef PROTO_ENABLED
3090 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3095 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3096 for(int ift_i=0;ift_i<ift.size();ift_i++){
3097 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3101 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3106 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3107 for(int ift_i=0;ift_i<ift.size();ift_i++){
3108 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3109 #ifdef KAFKA_ENABLED
3112 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3123 for(i=0;i<hfta_names.size();++i)
3124 fprintf(outfl," %s",hfta_names[i].c_str());
3128 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3129 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3131 fprintf(outfl,"-L. ");
3133 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3135 fprintf(outfl,"-lgscppads -lpads ");
3137 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3139 fprintf(outfl, " -lpz -lz -lbz ");
3140 if(libz_exists && libast_exists)
3141 fprintf(outfl," -last ");
3143 fprintf(outfl, " -ldll -ldl ");
3145 #ifdef PROTO_ENABLED
3146 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3149 fprintf(outfl, " -lbsa_stream ");
3151 #ifdef KAFKA_ENABLED
3152 fprintf(outfl, " -lrdkafka ");
3154 fprintf(outfl," -lgscpaux");
3156 fprintf(outfl," -fprofile-arcs");
3161 "lfta.o: %s_lfta.c\n"
3162 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3164 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3165 for(i=0;i<nfiles;++i)
3166 fprintf(outfl," %s",input_file_names[i].c_str());
3168 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3170 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3172 for(i=0;i<nfiles;++i)
3173 fprintf(outfl," %s",input_file_names[i].c_str());
3174 fprintf(outfl,"\n");
3176 for(i=0;i<hfta_names.size();++i)
3179 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3182 "\t$(CPP) -o %s.o -c %s.cc\n"
3185 hfta_names[i].c_str(), hfta_names[i].c_str(),
3186 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3187 hfta_names[i].c_str(), hfta_names[i].c_str(),
3188 hfta_names[i].c_str(), hfta_names[i].c_str()
3193 "packet_schema.txt:\n"
3194 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3196 "external_fcns.def:\n"
3197 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3200 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3201 for(i=0;i<hfta_names.size();++i)
3202 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3203 fprintf(outfl,"\n");
3209 // Gather the set of interfaces
3210 // TODO : must update to hanndle machines
3211 // TODO : lookup interface attributes and add them as a parameter to rts process
3212 outfl = fopen("runit", "w");
3214 fprintf(stderr,"Can't open runit for write, exiting.\n");
3222 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3224 "if [ ! -f gshub.log ]\n"
3226 "\techo \"Failed to start bin/gshub.py\"\n"
3229 "ADDR=`cat gshub.log`\n"
3230 "ps opgid= $! >> gs.pids\n"
3231 "./rts $ADDR default ").c_str(), outfl);
3234 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3235 string ifnm = (*ssi);
3236 // suppress internal _local_ interface
3237 if (ifnm == "_local_")
3239 fprintf(outfl, "%s ",ifnm.c_str());
3240 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3241 for(j=0;j<ifv.size();++j)
3242 fprintf(outfl, "%s ",ifv[j].c_str());
3244 fprintf(outfl, " &\n");
3245 fprintf(outfl, "echo $! >> gs.pids\n");
3246 for(i=0;i<hfta_names.size();++i)
3247 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3249 for(j=0;j<opviews.opview_list.size();++j){
3250 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3254 system("chmod +x runit");
3256 outfl = fopen("stopit", "w");
3258 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3262 fprintf(outfl,"#!/bin/sh\n"
3264 "if [ ! -f gs.pids ]\n"
3268 "for pgid in `cat gs.pids`\n"
3270 "kill -TERM -$pgid\n"
3273 "for pgid in `cat gs.pids`\n"
3280 system("chmod +x stopit");
3282 //-----------------------------------------------
3284 /* For now disable support for virtual interfaces
3285 outfl = fopen("set_vinterface_hash.bat", "w");
3287 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3291 // The format should be determined by an entry in the ifres.xml file,
3292 // but for now hardcode the only example I have.
3293 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3294 if(rts_hload.count((*ssi))){
3295 string iface_name = (*ssi);
3296 string iface_number = "";
3297 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3298 if(isdigit(iface_name[j])){
3299 iface_number = iface_name[j];
3300 if(j>0 && isdigit(iface_name[j-1]))
3301 iface_number = iface_name[j-1] + iface_number;
3305 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3306 vector<int> halloc = rts_hload[iface_name];
3308 for(j=0;j<halloc.size();++j){
3311 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3312 prev_limit = halloc[j];
3314 fprintf(outfl,"\n");
3318 system("chmod +x set_vinterface_hash.bat");
3322 // Code for implementing a local schema
3324 table_list qpSchema;
3326 // Load the schemas of any LFTAs.
3328 for(l=0;l<hfta_nbr;++l){
3329 stream_query *sq0 = split_queries[l];
3330 table_def *td = sq0->get_output_tabledef();
3331 qpSchema.append_table(td);
3333 // load the schemas of any other ref'd tables.
3335 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3337 for(ti=0;ti<input_tbl_names.size();++ti){
3338 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3340 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3342 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3345 qpSchema.append_table(Schema->get_table(tbl_ref));
3350 // Functions related to parsing.
3353 static int split_string(char *instr,char sep, char **words,int max_words){
3359 words[nwords++] = str;
3360 while( (loc = strchr(str,sep)) != NULL){
3363 if(nwords >= max_words){
3364 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3365 nwords = max_words-1;
3367 words[nwords++] = str;