1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
67 // Interface to the field list verifier
68 field_list *field_verifier = NULL;
70 #define TMPSTRLEN 1000
73 #define PATH_DELIM '/'
76 char tmp_schema_str[10000];
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
83 // Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
86 // Interface to FTA definition lexer and parser ...
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
97 // Interface to external function lexer and parser ...
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
103 ext_fcn_list *Ext_fcns;
106 // Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
116 // forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118 vector<string> &hfta_names, opview_set &opviews,
119 vector<string> &machine_names,
120 string schema_file_name,
121 vector<string> &interface_names,
122 ifq_t *ifdb, string &config_dir_path,
125 map<string, vector<int> > &rts_hload
128 //static int split_string(char *instr,char sep, char **words,int max_words);
131 FILE *schema_summary_output = NULL; // query names
133 // Dump schema summary
134 void dump_summary(stream_query *str){
135 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
137 table_def *sch = str->get_output_tabledef();
139 vector<field_entry *> flds = sch->get_fields();
141 for(f=0;f<flds.size();++f){
142 if(f>0) fprintf(schema_summary_output,"|");
143 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
145 fprintf(schema_summary_output,"\n");
146 for(f=0;f<flds.size();++f){
147 if(f>0) fprintf(schema_summary_output,"|");
148 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
150 fprintf(schema_summary_output,"\n");
154 string hostname; // name of current host.
156 bool generate_stats = false;
157 string root_path = "../..";
160 int main(int argc, char **argv){
161 char tmpstr[TMPSTRLEN];
165 set<int>::iterator si;
167 vector<string> query_names; // for lfta.c registration
168 map<string, vector<int> > mach_query_names; // list queries of machine
169 vector<int> snap_lengths; // for lfta.c registration
170 vector<string> interface_names; // for lfta.c registration
171 vector<string> machine_names; // machine of interface
172 vector<bool> lfta_reuse_options; // for lfta.c registration
173 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
174 vector<string> hfta_names; // hfta cource code names, for
175 // creating make file.
176 vector<string> qnames; // ensure unique names
177 map<string, int> lfta_names; // keep track of unique lftas.
180 // set these to 1 to debug the parser
182 Ext_fcnsParserdebug = 0;
184 FILE *lfta_out; // lfta.c output.
185 FILE *fta_in; // input file
186 FILE *table_schemas_in; // source tables definition file
187 FILE *query_name_output; // query names
188 FILE *qtree_output; // interconnections of query nodes
190 // -------------------------------
191 // Handling of Input Arguments
192 // -------------------------------
193 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195 "\t[-B] : debug only (don't create output files)\n"
196 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199 "\t[-C] : use <config directory> for definition files\n"
200 "\t[-l] : use <library directory> for library queries\n"
201 "\t[-N] : output query names in query_names.txt\n"
202 "\t[-H] : create HFTA only (no schema_file)\n"
203 "\t[-Q] : use query name for hfta suffix\n"
204 "\t[-M] : generate make file and runit, stopit scripts\n"
205 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206 "\t[-f] : Output schema summary to schema_summary.txt\n"
207 "\t[-P] : link with PADS\n"
208 "\t[-h] : override host name.\n"
209 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210 "\t[-R] : path to root of GS-lite\n"
213 // parameters gathered from command line processing
214 string external_fcns_path;
215 // string internal_fcn_path;
216 string config_dir_path;
217 string library_path = "./";
218 vector<string> input_file_names;
219 string schema_file_name;
220 bool debug_only = false;
221 bool hfta_only = false;
222 bool output_query_names = false;
223 bool output_schema_summary=false;
224 bool numeric_hfta_flname = true;
225 bool create_makefile = false;
226 bool distributed_mode = false;
227 bool partitioned_mode = false;
228 bool use_live_hosts_file = false;
229 bool use_pads = false;
230 bool clean_make = false;
231 int n_virtual_interfaces = 1;
234 while((chopt = getopt(argc,argv,optstr)) != -1){
240 distributed_mode = true;
243 partitioned_mode = true;
246 use_live_hosts_file = true;
250 config_dir_path = string(optarg) + string("/");
254 library_path = string(optarg) + string("/");
257 output_query_names = true;
260 numeric_hfta_flname = false;
263 if(schema_file_name == ""){
268 output_schema_summary=true;
271 create_makefile=true;
292 n_virtual_interfaces = atoi(optarg);
293 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295 n_virtual_interfaces = 1;
300 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301 fprintf(stderr,"%s\n", usage_str);
304 fprintf(stderr, "Argument was %c\n", optopt);
305 fprintf(stderr,"Invalid arguments\n");
306 fprintf(stderr,"%s\n", usage_str);
312 for (int i = 0; i < argc; ++i) {
313 if((schema_file_name == "") && !hfta_only){
314 schema_file_name = argv[i];
316 input_file_names.push_back(argv[i]);
320 if(input_file_names.size() == 0){
321 fprintf(stderr,"%s\n", usage_str);
326 string clean_cmd = "rm Makefile hfta_*.cc";
327 int clean_ret = system(clean_cmd.c_str());
329 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
334 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
336 // Open globally used file names.
338 // prepend config directory to schema file
339 schema_file_name = config_dir_path + schema_file_name;
340 external_fcns_path = config_dir_path + string("external_fcns.def");
341 string ifx_fname = config_dir_path + string("ifres.xml");
343 // Find interface query file(s).
345 gethostname(tmpstr,TMPSTRLEN);
348 hostname_len = strlen(tmpstr);
349 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350 vector<string> ifq_fls;
352 ifq_fls.push_back(ifq_fname);
355 // Get the field list, if it exists
356 string flist_fl = config_dir_path + "field_list.xml";
358 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360 xml_leaves = new xml_t();
361 xmlParser_setfileinput(flf_in);
362 if(xmlParserparse()){
363 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
365 field_verifier = new field_list(xml_leaves);
370 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
377 if(!(debug_only || hfta_only)){
378 if((lfta_out = fopen("lfta.c","w")) == NULL){
379 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
385 // Get the output specification file.
387 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388 string ospec_fl = "output_spec.cfg";
390 vector<ospec_str *> output_specs;
391 multimap<string, int> qname_to_ospec;
392 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
397 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
399 // make operator type lowercase
401 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402 *tmpc = tolower(*tmpc);
404 ospec_str *tmp_ospec = new ospec_str();
405 tmp_ospec->query = flds[0];
406 tmp_ospec->operator_type = flds[1];
407 tmp_ospec->operator_param = flds[2];
408 tmp_ospec->output_directory = flds[3];
409 tmp_ospec->bucketwidth = atoi(flds[4]);
410 tmp_ospec->partitioning_flds = flds[5];
411 tmp_ospec->n_partitions = atoi(flds[6]);
412 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413 output_specs.push_back(tmp_ospec);
415 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
425 string pspec_fl = "hfta_parallelism.cfg";
427 map<string, int> hfta_parallelism;
428 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432 bool good_entry = true;
434 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
436 string hname = flds[0];
437 int par = atoi(flds[1]);
438 if(par <= 0 || par > n_virtual_interfaces){
439 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442 if(good_entry && n_virtual_interfaces % par != 0){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 hfta_parallelism[hname] = par;
451 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
455 // LFTA hash table sizes
456 string htspec_fl = "lfta_htsize.cfg";
457 FILE *htsp_in = NULL;
458 map<string, int> lfta_htsize;
459 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463 bool good_entry = true;
465 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
467 string lfta_name = flds[0];
468 int htsz = atoi(flds[1]);
470 lfta_htsize[lfta_name] = htsz;
472 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480 // LFTA vitual interface hash split
481 string rtlspec_fl = "rts_load.cfg";
483 map<string, vector<int> > rts_hload;
484 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490 bool good_entry = true;
494 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
496 iface_name = flds[0];
499 for(j=1;j<nflds;++j){
500 int h = atoi(flds[j]);
504 hload.push_back(cumm_h);
510 rts_hload[iface_name] = hload;
512 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
519 if(output_query_names){
520 if((query_name_output = fopen("query_names.txt","w")) == NULL){
521 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526 if(output_schema_summary){
527 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533 if((qtree_output = fopen("qtree.xml","w")) == NULL){
534 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539 fprintf(qtree_output,"<QueryNodes>\n");
542 // Get an initial Schema
545 // Parse the table schema definitions.
546 fta_parse_result = new fta_parse_t();
547 FtaParser_setfileinput(table_schemas_in);
548 if(FtaParserparse()){
549 fprintf(stderr,"Table schema parse failed.\n");
552 if(fta_parse_result->parse_type != TABLE_PARSE){
553 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556 Schema = fta_parse_result->tables;
558 // Ensure that all schema_ids, if set, are distinct.
559 // Obsolete? There is code elsewhere to ensure that schema IDs are
560 // distinct on a per-interface basis.
564 for(int t=0;t<Schema->size();++t){
565 int sch_id = Schema->get_table(t)->get_schema_id();
567 if(found_ids.find(sch_id) != found_ids.end()){
568 dup_ids.insert(sch_id);
570 found_ids.insert(sch_id);
573 if(dup_ids.size()>0){
574 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576 fprintf(stderr," %d",(*dit));
577 fprintf(stderr,"\n");
584 // Process schema field inheritance
586 retval = Schema->unroll_tables(err_str);
588 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
592 // hfta only => we will try to fetch schemas from the registry.
593 // therefore, start off with an empty schema.
594 Schema = new table_list();
598 // Open and parse the external functions file.
599 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600 if(Ext_fcnsParserin == NULL){
601 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602 Ext_fcns = new ext_fcn_list();
604 if(Ext_fcnsParserparse()){
605 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606 Ext_fcns = new ext_fcn_list();
609 if(Ext_fcns->validate_fcns(err_str)){
610 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
614 // Open and parse the interface resources file.
615 // ifq_t *ifaces_db = new ifq_t();
617 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 // ifx_fname.c_str(), ierr.c_str());
622 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 // ifq_fname.c_str(), ierr.c_str());
629 // The LFTA code string.
630 // Put the standard preamble here.
631 // NOTE: the hash macros, fcns should go into the run time
632 map<string, string> lfta_val;
633 map<string, string> lfta_prefilter_val;
636 "#include <limits.h>\n\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n\n"
642 // Get any locally defined parsing headers
644 memset(&glob_result, 0, sizeof(glob_result));
646 // do the glob operation
647 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
648 if(return_value == 0){
649 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
651 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
652 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
655 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
659 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
660 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
661 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
662 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
667 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
669 "#define SLOT_FILLED 0x04\n"
670 "#define SLOT_GEN_BITS 0x03\n"
671 "#define SLOT_HASH_BITS 0xfffffff8\n"
672 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
673 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
674 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
677 "#define lfta_BOOL_to_hash(x) (x)\n"
678 "#define lfta_USHORT_to_hash(x) (x)\n"
679 "#define lfta_UINT_to_hash(x) (x)\n"
680 "#define lfta_IP_to_hash(x) (x)\n"
681 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
682 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
683 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
684 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
685 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
686 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
687 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
688 " for(i=0;i<x.length;++i){\n"
689 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
695 " if((i%4)!=0) ret ^=tmp_sum;\n"
701 //////////////////////////////////////////////////////////////////
702 ///// Get all of the query parse trees
706 int hfta_count = 0; // for numeric suffixes to hfta .cc files
708 //---------------------------
709 // Global info needed for post processing.
711 // Set of operator views ref'd in the query set.
713 // lftas on a per-machine basis.
714 map<string, vector<stream_query *> > lfta_mach_lists;
715 int nfiles = input_file_names.size();
716 vector<stream_query *> hfta_list; // list of hftas.
717 map<string, stream_query *> sq_map; // map from query name to stream query.
720 //////////////////////////////////////////
722 // Open and parse the interface resources file.
723 ifq_t *ifaces_db = new ifq_t();
725 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
726 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
727 ifx_fname.c_str(), ierr.c_str());
730 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
731 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
732 ifq_fls[0].c_str(), ierr.c_str());
736 map<string, string> qname_to_flname; // for detecting duplicate query names
740 // Parse the files to create a vector of parse trees.
741 // Load qnodes with information to perform a topo sort
742 // based on query dependencies.
743 vector<query_node *> qnodes; // for topo sort.
744 map<string,int> name_node_map; // map query name to qnodes entry
745 for(i=0;i<input_file_names.size();i++){
747 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
748 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
751 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
753 // Parse the FTA query
754 fta_parse_result = new fta_parse_t();
755 FtaParser_setfileinput(fta_in);
756 if(FtaParserparse()){
757 fprintf(stderr,"FTA parse failed.\n");
760 if(fta_parse_result->parse_type != QUERY_PARSE){
761 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
765 // returns a list of parse trees
766 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
767 for(p=0;p<qlist.size();++p){
768 table_exp_t *fta_parse_tree = qlist[p];
769 // query_parse_trees.push_back(fta_parse_tree);
771 // compute the default name -- extract from query name
772 strcpy(tmpstr,input_file_names[i].c_str());
773 char *qname = strrchr(tmpstr,PATH_DELIM);
778 char *qname_end = strchr(qname,'.');
779 if(qname_end != NULL) *qname_end = '\0';
780 string qname_str = qname;
781 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
783 // Deternmine visibility. Should I be attaching all of the output methods?
784 if(qname_to_ospec.count(imputed_qname)>0)
785 fta_parse_tree->set_visible(true);
787 fta_parse_tree->set_visible(false);
790 // Create a manipulable repesentation of the parse tree.
791 // the qnode inherits the visibility assigned to the parse tree.
792 int pos = qnodes.size();
793 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
794 name_node_map[ qnodes[pos]->name ] = pos;
795 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
796 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
797 // qfiles.push_back(i);
799 // Check for duplicate query names
800 // NOTE : in hfta-only generation, I should
801 // also check with the names of the registered queries.
802 if(qname_to_flname.count(qnodes[pos]->name) > 0){
803 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
804 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
807 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
808 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
809 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
812 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
818 // Add the library queries
821 for(pos=0;pos<qnodes.size();++pos){
823 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
824 string src_tbl = qnodes[pos]->refd_tbls[fi];
825 if(qname_to_flname.count(src_tbl) == 0){
826 int last_sep = src_tbl.find_last_of('/');
827 if(last_sep != string::npos){
828 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
829 string target_qname = src_tbl.substr(last_sep+1);
830 string qpathname = library_path + src_tbl + ".gsql";
831 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
832 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
834 fprintf(stderr,"After exit\n");
836 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
837 // Parse the FTA query
838 fta_parse_result = new fta_parse_t();
839 FtaParser_setfileinput(fta_in);
840 if(FtaParserparse()){
841 fprintf(stderr,"FTA parse failed.\n");
844 if(fta_parse_result->parse_type != QUERY_PARSE){
845 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
849 map<string, int> local_query_map;
850 vector<string> local_query_names;
851 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
852 for(p=0;p<qlist.size();++p){
853 table_exp_t *fta_parse_tree = qlist[p];
854 fta_parse_tree->set_visible(false); // assumed to not produce output
855 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
856 if(imputed_qname == target_qname)
857 imputed_qname = src_tbl;
858 if(local_query_map.count(imputed_qname)>0){
859 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
862 local_query_map[ imputed_qname ] = p;
863 local_query_names.push_back(imputed_qname);
866 if(local_query_map.count(src_tbl)==0){
867 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
871 vector<int> worklist;
872 set<int> added_queries;
873 vector<query_node *> new_qnodes;
874 worklist.push_back(local_query_map[target_qname]);
875 added_queries.insert(local_query_map[target_qname]);
877 int qpos = qnodes.size();
878 for(qq=0;qq<worklist.size();++qq){
879 int q_id = worklist[qq];
880 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
881 new_qnodes.push_back( new_qnode);
882 vector<string> refd_tbls = new_qnode->refd_tbls;
884 for(ff = 0;ff<refd_tbls.size();++ff){
885 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
887 if(name_node_map.count(refd_tbls[ff])>0){
888 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
891 worklist.push_back(local_query_map[refd_tbls[ff]]);
897 for(qq=0;qq<new_qnodes.size();++qq){
898 int qpos = qnodes.size();
899 qnodes.push_back(new_qnodes[qq]);
900 name_node_map[qnodes[qpos]->name ] = qpos;
901 qname_to_flname[qnodes[qpos]->name ] = qpathname;
915 //---------------------------------------
920 string udop_missing_sources;
921 for(i=0;i<qnodes.size();++i){
923 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
924 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
926 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
927 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
928 int pos = qnodes.size();
929 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
930 name_node_map[ qnodes[pos]->name ] = pos;
931 qnodes[pos]->is_externally_visible = false; // its visible
932 // Need to mark the source queries as visible.
934 string missing_sources = "";
935 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
936 string src_tbl = qnodes[pos]->refd_tbls[si];
937 if(name_node_map.count(src_tbl)==0){
938 missing_sources += src_tbl + " ";
941 if(missing_sources != ""){
942 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
949 if(udop_missing_sources != ""){
950 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
956 ////////////////////////////////////////////////////////////////////
957 /// Check parse trees to verify that some
958 /// global properties are met :
959 /// if q1 reads from q2, then
960 /// q2 is processed before q1
961 /// q1 can supply q2's parameters
962 /// Verify there is no cycle in the reads-from graph.
964 // Compute an order in which to process the
967 // Start by building the reads-from lists.
970 for(i=0;i<qnodes.size();++i){
972 vector<string> refd_tbls = qnodes[i]->refd_tbls;
973 for(fi = 0;fi<refd_tbls.size();++fi){
974 if(name_node_map.count(refd_tbls[fi])>0){
975 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
976 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
982 // If one query reads the result of another,
983 // check for parameter compatibility. Currently it must
984 // be an exact match. I will move to requiring
985 // containment after re-ordering, but will require
986 // some analysis for code generation which is not
988 //printf("There are %d query nodes.\n",qnodes.size());
991 for(i=0;i<qnodes.size();++i){
992 vector<var_pair_t *> target_params = qnodes[i]->params;
993 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
994 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
995 if(target_params.size() != source_params.size()){
996 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1000 for(p=0;p<target_params.size();++p){
1001 if(! (target_params[p]->name == source_params[p]->name &&
1002 target_params[p]->val == source_params[p]->val ) ){
1003 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1012 // Start by counting inedges.
1013 for(i=0;i<qnodes.size();++i){
1014 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1015 qnodes[(*si)]->n_consumers++;
1019 // The roots are the nodes with indegree zero.
1021 for(i=0;i<qnodes.size();++i){
1022 if(qnodes[i]->n_consumers == 0){
1023 if(qnodes[i]->is_externally_visible == false){
1024 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1030 // Remove the parts of the subtree that produce no output.
1031 set<int> valid_roots;
1032 set<int> discarded_nodes;
1033 set<int> candidates;
1034 while(roots.size() >0){
1035 for(si=roots.begin();si!=roots.end();++si){
1036 if(qnodes[(*si)]->is_externally_visible){
1037 valid_roots.insert((*si));
1039 discarded_nodes.insert((*si));
1040 set<int>::iterator sir;
1041 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1042 qnodes[(*sir)]->n_consumers--;
1043 if(qnodes[(*sir)]->n_consumers == 0)
1044 candidates.insert( (*sir));
1051 roots = valid_roots;
1052 if(discarded_nodes.size()>0){
1053 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1055 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1056 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1058 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1060 fprintf(stderr,"\n");
1063 // Compute the sources_to set, ignoring discarded nodes.
1064 for(i=0;i<qnodes.size();++i){
1065 if(discarded_nodes.count(i)==0)
1066 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1067 qnodes[(*si)]->sources_to.insert(i);
1072 // Find the nodes that are shared by multiple visible subtrees.
1073 // THe roots become inferred visible nodes.
1075 // Find the visible nodes.
1076 vector<int> visible_nodes;
1077 for(i=0;i<qnodes.size();i++){
1078 if(qnodes[i]->is_externally_visible){
1079 visible_nodes.push_back(i);
1083 // Find UDOPs referenced by visible nodes.
1085 for(i=0;i<visible_nodes.size();++i){
1086 workq.push_back(visible_nodes[i]);
1088 while(!workq.empty()){
1089 int node = workq.front();
1091 set<int>::iterator children;
1092 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1093 qnodes[node]->is_externally_visible = true;
1094 visible_nodes.push_back(node);
1095 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1096 if(qnodes[(*children)]->is_externally_visible == false){
1097 qnodes[(*children)]->is_externally_visible = true;
1098 visible_nodes.push_back((*children));
1102 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103 workq.push_back((*children));
1110 for(i=0;i<qnodes.size();i++){
1111 qnodes[i]->subtree_roots.clear();
1114 // Walk the tree defined by a visible node, not descending into
1115 // subtrees rooted by a visible node. Mark the node visited with
1116 // the visible node ID.
1117 for(i=0;i<visible_nodes.size();++i){
1119 vroots.insert(visible_nodes[i]);
1120 while(vroots.size()>0){
1121 for(si=vroots.begin();si!=vroots.end();++si){
1122 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1124 set<int>::iterator sir;
1125 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1126 if(! qnodes[(*sir)]->is_externally_visible){
1127 candidates.insert( (*sir));
1131 vroots = candidates;
1135 // Find the nodes in multiple visible node subtrees, but with no parent
1136 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1137 done = true; // until proven otherwise
1138 for(i=0;i<qnodes.size();i++){
1139 if(qnodes[i]->subtree_roots.size()>1){
1140 bool is_new_root = true;
1141 set<int>::iterator sir;
1142 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1143 if(qnodes[(*sir)]->subtree_roots.size()>1)
1144 is_new_root = false;
1147 qnodes[i]->is_externally_visible = true;
1148 qnodes[i]->inferred_visible_node = true;
1149 visible_nodes.push_back(i);
1160 // get visible nodes in topo ordering.
1161 // for(i=0;i<qnodes.size();i++){
1162 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1164 vector<int> process_order;
1165 while(roots.size() >0){
1166 for(si=roots.begin();si!=roots.end();++si){
1167 if(discarded_nodes.count((*si))==0){
1168 process_order.push_back( (*si) );
1170 set<int>::iterator sir;
1171 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1172 qnodes[(*sir)]->n_consumers--;
1173 if(qnodes[(*sir)]->n_consumers == 0)
1174 candidates.insert( (*sir));
1182 //printf("process_order.size() =%d\n",process_order.size());
1184 // Search for cyclic dependencies
1186 for(i=0;i<qnodes.size();++i){
1187 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1188 if(found_dep.size() != 0) found_dep += ", ";
1189 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1192 if(found_dep.size()>0){
1193 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1197 // Get a list of query sets, in the order to be processed.
1198 // Start at visible root and do bfs.
1199 // The query set includes queries referenced indirectly,
1200 // as sources for user-defined operators. These are needed
1201 // to ensure that they are added to the schema, but are not part
1202 // of the query tree.
1204 // stream_node_sets contains queries reachable only through the
1205 // FROM clause, so I can tell which queries to add to the stream
1206 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1208 // NOTE: this code works because in order for data to be
1209 // read by multiple hftas, the node must be externally visible.
1210 // But visible nodes define roots of process sets.
1211 // internally visible nodes can feed data only
1212 // to other nodes in the same query file.
1213 // Therefore, any access can be restricted to a file,
1214 // hfta output sharing is done only on roots
1215 // never on interior nodes.
1220 // Conpute the base collection of hftas.
1221 vector<hfta_node *> hfta_sets;
1222 map<string, int> hfta_name_map;
1223 // vector< vector<int> > process_sets;
1224 // vector< set<int> > stream_node_sets;
1225 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1226 // i.e. process leaves 1st.
1227 for(i=0;i<process_order.size();++i){
1228 if(qnodes[process_order[i]]->is_externally_visible == true){
1229 //printf("Visible.\n");
1230 int root = process_order[i];
1231 hfta_node *hnode = new hfta_node();
1232 hnode->name = qnodes[root]-> name;
1233 hnode->source_name = qnodes[root]-> name;
1234 hnode->is_udop = qnodes[root]->is_udop;
1235 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1237 vector<int> proc_list; proc_list.push_back(root);
1238 // Ensure that nodes are added only once.
1239 set<int> proc_set; proc_set.insert(root);
1240 roots.clear(); roots.insert(root);
1242 while(roots.size()>0){
1243 for(si=roots.begin();si!=roots.end();++si){
1244 //printf("Processing root %d\n",(*si));
1245 set<int>::iterator sir;
1246 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1247 //printf("reads fom %d\n",(*sir));
1248 if(qnodes[(*sir)]->is_externally_visible==false){
1249 candidates.insert( (*sir) );
1250 if(proc_set.count( (*sir) )==0){
1251 proc_set.insert( (*sir) );
1252 proc_list.push_back( (*sir) );
1261 reverse(proc_list.begin(), proc_list.end());
1262 hnode->query_node_indices = proc_list;
1263 hfta_name_map[hnode->name] = hfta_sets.size();
1264 hfta_sets.push_back(hnode);
1268 // Compute the reads_from / sources_to graphs for the hftas.
1270 for(i=0;i<hfta_sets.size();++i){
1271 hfta_node *hnode = hfta_sets[i];
1272 for(q=0;q<hnode->query_node_indices.size();q++){
1273 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1274 for(s=0;s<qnode->refd_tbls.size();++s){
1275 if(hfta_name_map.count(qnode->refd_tbls[s])){
1276 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1277 hnode->reads_from.insert(other_hfta);
1278 hfta_sets[other_hfta]->sources_to.insert(i);
1284 // Compute a topological sort of the hfta_sets.
1286 vector<int> hfta_topsort;
1288 int hnode_srcs[hfta_sets.size()];
1289 for(i=0;i<hfta_sets.size();++i){
1291 if(hfta_sets[i]->sources_to.size() == 0)
1295 while(! workq.empty()){
1296 int node = workq.front();
1298 hfta_topsort.push_back(node);
1299 set<int>::iterator stsi;
1300 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1301 int parent = (*stsi);
1302 hnode_srcs[parent]++;
1303 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1304 workq.push_back(parent);
1309 // Decorate hfta nodes with the level of parallelism given as input.
1311 map<string, int>::iterator msii;
1312 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1313 string hfta_name = (*msii).first;
1314 int par = (*msii).second;
1315 if(hfta_name_map.count(hfta_name) > 0){
1316 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1318 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1322 // Propagate levels of parallelism: children should have a level of parallelism
1323 // as large as any of its parents. Adjust children upwards to compensate.
1324 // Start at parents and adjust children, auto-propagation will occur.
1326 for(i=hfta_sets.size()-1;i>=0;i--){
1327 set<int>::iterator stsi;
1328 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1329 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1330 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1335 // Before all the name mangling, check if therey are any output_spec.cfg
1336 // or hfta_parallelism.cfg entries that do not have a matching query.
1338 string dangling_ospecs = "";
1339 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1340 string oq = (*msii).first;
1341 if(hfta_name_map.count(oq) == 0){
1342 dangling_ospecs += " "+(*msii).first;
1345 if(dangling_ospecs!=""){
1346 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1349 string dangling_par = "";
1350 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1351 string oq = (*msii).first;
1352 if(hfta_name_map.count(oq) == 0){
1353 dangling_par += " "+(*msii).first;
1356 if(dangling_par!=""){
1357 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1362 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1363 // FROM clauses: retarget any name which is an internal node, and
1364 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1365 // when the source hfta has more parallelism than the target node.
1366 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1369 int n_original_hfta_sets = hfta_sets.size();
1370 for(i=0;i<n_original_hfta_sets;++i){
1371 if(hfta_sets[i]->n_parallel > 1){
1372 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1373 set<string> local_nodes; // names of query nodes in the hfta.
1374 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1375 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1378 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1379 string mangler = "__copy"+int_to_string(p);
1380 hfta_node *par_hfta = new hfta_node();
1381 par_hfta->name = hfta_sets[i]->name + mangler;
1382 par_hfta->source_name = hfta_sets[i]->name;
1383 par_hfta->is_udop = hfta_sets[i]->is_udop;
1384 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1385 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1386 par_hfta->parallel_idx = p;
1388 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1391 if(hfta_sets[i]->is_udop){
1392 int root = hfta_sets[i]->query_node_indices[0];
1394 string unequal_par_sources;
1395 set<int>::iterator rfsii;
1396 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1397 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1398 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1401 if(unequal_par_sources != ""){
1402 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1407 vector<string> new_sources;
1408 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1409 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1412 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1413 new_qn->name += mangler;
1414 new_qn->mangler = mangler;
1415 new_qn->refd_tbls = new_sources;
1416 par_hfta->query_node_indices.push_back(qnodes.size());
1417 par_qnode_map[new_qn->name] = qnodes.size();
1418 name_node_map[ new_qn->name ] = qnodes.size();
1419 qnodes.push_back(new_qn);
1421 // regular query node
1422 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1423 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1424 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1425 // rehome the from clause on mangled names.
1426 // create merge nodes as needed for external sources.
1427 for(f=0;f<dup_pt->fm->tlist.size();++f){
1428 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1429 dup_pt->fm->tlist[f]->schema_name += mangler;
1430 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1431 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1432 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1433 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1434 dup_pt->fm->tlist[f]->schema_name += mangler;
1436 vector<string> src_tbls;
1437 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1439 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1442 for(s=0;s<stride;++s){
1443 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1444 src_tbls.push_back(ext_src_name);
1446 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1447 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1448 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1449 // Make a qnode to represent the new merge node
1450 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1451 qn_pt->refd_tbls = src_tbls;
1452 qn_pt->is_udop = false;
1453 qn_pt->is_externally_visible = false;
1454 qn_pt->inferred_visible_node = false;
1455 par_hfta->query_node_indices.push_back(qnodes.size());
1456 par_qnode_map[merge_node_name] = qnodes.size();
1457 name_node_map[ merge_node_name ] = qnodes.size();
1458 qnodes.push_back(qn_pt);
1462 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1463 for(f=0;f<dup_pt->fm->tlist.size();++f){
1464 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1466 new_qn->params = qnodes[hqn_idx]->params;
1467 new_qn->is_udop = false;
1468 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1469 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1470 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1471 par_qnode_map[new_qn->name] = qnodes.size();
1472 name_node_map[ new_qn->name ] = qnodes.size();
1473 qnodes.push_back(new_qn);
1476 hfta_name_map[par_hfta->name] = hfta_sets.size();
1477 hfta_sets.push_back(par_hfta);
1480 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1482 if(!hfta_sets[i]->is_udop){
1483 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1484 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1485 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1486 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1487 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1488 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1489 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1490 vector<string> src_tbls;
1491 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1492 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1493 src_tbls.push_back(ext_src_name);
1495 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1496 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1497 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1498 // Make a qnode to represent the new merge node
1499 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1500 qn_pt->refd_tbls = src_tbls;
1501 qn_pt->is_udop = false;
1502 qn_pt->is_externally_visible = false;
1503 qn_pt->inferred_visible_node = false;
1504 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1505 name_node_map[ merge_node_name ] = qnodes.size();
1506 qnodes.push_back(qn_pt);
1515 // Rebuild the reads_from / sources_to lists in the qnodes
1516 for(q=0;q<qnodes.size();++q){
1517 qnodes[q]->reads_from.clear();
1518 qnodes[q]->sources_to.clear();
1520 for(q=0;q<qnodes.size();++q){
1521 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1522 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1523 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1524 qnodes[q]->reads_from.insert(rf);
1525 qnodes[rf]->sources_to.insert(q);
1530 // Rebuild the reads_from / sources_to lists in hfta_sets
1531 for(q=0;q<hfta_sets.size();++q){
1532 hfta_sets[q]->reads_from.clear();
1533 hfta_sets[q]->sources_to.clear();
1535 for(q=0;q<hfta_sets.size();++q){
1536 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1537 int node = hfta_sets[q]->query_node_indices[s];
1538 set<int>::iterator rfsii;
1539 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1540 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1541 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1542 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1549 for(q=0;q<qnodes.size();++q){
1550 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1551 set<int>::iterator rsii;
1552 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1553 printf(" %d",(*rsii));
1554 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1555 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1556 printf(" %d",(*rsii));
1560 for(q=0;q<hfta_sets.size();++q){
1561 if(hfta_sets[q]->do_generation==false)
1563 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1564 set<int>::iterator rsii;
1565 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1566 printf(" %d",(*rsii));
1567 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1568 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1569 printf(" %d",(*rsii));
1576 // Re-topo sort the hftas
1577 hfta_topsort.clear();
1579 int hnode_srcs_2[hfta_sets.size()];
1580 for(i=0;i<hfta_sets.size();++i){
1581 hnode_srcs_2[i] = 0;
1582 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1587 while(workq.empty() == false){
1588 int node = workq.front();
1590 hfta_topsort.push_back(node);
1591 set<int>::iterator stsii;
1592 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1593 int child = (*stsii);
1594 hnode_srcs_2[child]++;
1595 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1596 workq.push_back(child);
1601 // Ensure that all of the query_node_indices in hfta_sets are topologically
1602 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1603 for(i=0;i<hfta_sets.size();++i){
1604 if(hfta_sets[i]->do_generation){
1605 map<int,int> n_accounted;
1606 vector<int> new_order;
1608 vector<int>::iterator vii;
1609 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1610 n_accounted[(*vii)]= 0;
1612 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1613 set<int>::iterator rfsii;
1614 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1615 if(n_accounted.count((*rfsii)) == 0){
1616 n_accounted[(*vii)]++;
1619 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1620 workq.push_back((*vii));
1624 while(workq.empty() == false){
1625 int node = workq.front();
1627 new_order.push_back(node);
1628 set<int>::iterator stsii;
1629 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1630 if(n_accounted.count((*stsii))){
1631 n_accounted[(*stsii)]++;
1632 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1633 workq.push_back((*stsii));
1638 hfta_sets[i]->query_node_indices = new_order;
1646 /// Global checkng is done, start the analysis and translation
1647 /// of the query parse tree in the order specified by process_order
1650 // Get a list of the LFTAs for global lfta optimization
1651 // TODO: separate building operators from spliting lftas,
1652 // that will make optimizations such as predicate pushing easier.
1653 vector<stream_query *> lfta_list;
1654 stream_query *rootq;
1657 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1659 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1661 int hfta_id = hfta_topsort[qi];
1662 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1666 // Two possibilities, either its a UDOP, or its a collection of queries.
1667 // if(qnodes[curr_list.back()]->is_udop)
1668 if(hfta_sets[hfta_id]->is_udop){
1669 int node_id = curr_list.back();
1670 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1671 opview_entry *opv = new opview_entry();
1673 // Many of the UDOP properties aren't currently used.
1674 opv->parent_qname = "no_parent";
1675 opv->root_name = qnodes[node_id]->name;
1676 opv->view_name = qnodes[node_id]->file;
1678 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1679 opv->udop_alias = tmpstr;
1680 opv->mangler = qnodes[node_id]->mangler;
1682 if(opv->mangler != ""){
1683 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1684 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1687 // This piece of code makes each hfta which referes to the same udop
1688 // reference a distinct running udop. Do this at query optimization time?
1689 // fmtbl->set_udop_alias(opv->udop_alias);
1691 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1692 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1694 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1696 for(s=0;s<subq.size();++s){
1697 // Validate that the fields match.
1698 subquery_spec *sqs = subq[s];
1699 string subq_name = sqs->name + opv->mangler;
1700 vector<field_entry *> flds = Schema->get_fields(subq_name);
1701 if(flds.size() == 0){
1702 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1705 if(flds.size() < sqs->types.size()){
1706 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1709 bool failed = false;
1710 for(f=0;f<sqs->types.size();++f){
1711 data_type dte(sqs->types[f],sqs->modifiers[f]);
1712 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1713 if(! dte.subsumes_type(&dtf) ){
1714 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1718 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1719 string pstr = dte.get_temporal_string();
1720 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1727 /// Validation done, find the subquery, make a copy of the
1728 /// parse tree, and add it to the return list.
1729 for(q=0;q<qnodes.size();++q)
1730 if(qnodes[q]->name == subq_name)
1732 if(q==qnodes.size()){
1733 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1739 // Cross-link to from entry(s) in all sourced-to tables.
1740 set<int>::iterator sii;
1741 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1742 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1743 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1745 for(ii=0;ii<tblvars.size();++ii){
1746 if(tblvars[ii]->schema_name == opv->root_name){
1747 tblvars[ii]->set_opview_idx(opviews.size());
1753 opviews.append(opv);
1756 // Analyze the parse trees in this query,
1757 // put them in rootq
1758 // vector<int> curr_list = process_sets[qi];
1761 ////////////////////////////////////////
1764 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1765 for(qj=0;qj<curr_list.size();++qj){
1767 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1769 // Select the current query parse tree
1770 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1772 // if hfta only, try to fetch any missing schemas
1773 // from the registry (using the print_schema program).
1774 // Here I use a hack to avoid analyzing the query -- all referenced
1775 // tables must be in the from clause
1776 // If there is a problem loading any table, just issue a warning,
1778 tablevar_list_t *fm = fta_parse_tree->get_from();
1779 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1780 // iterate over all referenced tables
1782 for(t=0;t<refd_tbls.size();++t){
1783 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1785 if(tbl_ref < 0){ // if this table is not in the Schema
1788 string cmd="print_schema "+refd_tbls[t];
1789 FILE *schema_in = popen(cmd.c_str(), "r");
1790 if(schema_in == NULL){
1791 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1793 string schema_instr;
1794 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1795 schema_instr += tmpstr;
1797 fta_parse_result = new fta_parse_t();
1798 strcpy(tmp_schema_str,schema_instr.c_str());
1799 FtaParser_setstringinput(tmp_schema_str);
1800 if(FtaParserparse()){
1801 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1803 if( fta_parse_result->tables != NULL){
1805 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1806 Schema->add_table(fta_parse_result->tables->get_table(tl));
1809 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1814 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1822 // Analyze the query.
1823 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1825 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1829 stream_query new_sq(qs, Schema);
1830 if(new_sq.error_code){
1831 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1835 // Add it to the Schema
1836 table_def *output_td = new_sq.get_output_tabledef();
1837 Schema->add_table(output_td);
1839 // Create a query plan from the analyzed parse tree.
1840 // If its a query referneced via FROM, add it to the stream query.
1842 rootq->add_query(new_sq);
1844 rootq = new stream_query(new_sq);
1845 // have the stream query object inherit properties form the analyzed
1846 // hfta_node object.
1847 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1848 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1854 // This stream query has all its parts
1855 // Build and optimize it.
1856 //printf("translate_fta: generating plan.\n");
1857 if(rootq->generate_plan(Schema)){
1858 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1862 // If we've found the query plan head, so now add the output operators
1863 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1864 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1865 multimap<string, int>::iterator mmsi;
1866 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1867 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1868 rootq->add_output_operator(output_specs[(*mmsi).second]);
1874 // Perform query splitting if necessary.
1876 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1879 //for(l=0;l<split_queries.size();++l){
1880 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1886 if(split_queries.size() > 0){ // should be at least one component.
1888 // Compute the number of LFTAs.
1889 int n_lfta = split_queries.size();
1890 if(hfta_returned) n_lfta--;
1891 // Check if a schemaId constraint needs to be inserted.
1893 // Process the LFTA components.
1894 for(l=0;l<n_lfta;++l){
1895 if(lfta_names.count(split_queries[l]->query_name) == 0){
1896 // Grab the lfta for global optimization.
1897 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1898 string liface = tvec[0]->get_interface(); // iface queries have been resolved
1899 string lmach = tvec[0]->get_machine();
1901 vector<predicate_t *> schemaid_preds;
1902 for(int irv=0;irv<tvec.size();++irv){
1904 string schema_name = tvec[irv]->get_schema_name();
1905 string rvar_name = tvec[irv]->get_var_name();
1906 int schema_ref = tvec[irv]->get_schema_ref();
1909 interface_names.push_back(liface);
1910 machine_names.push_back(lmach);
1911 //printf("Machine is %s\n",lmach.c_str());
1913 // Check if a schemaId constraint needs to be inserted.
1914 if(schema_ref<0){ // can result from some kinds of splits
1915 schema_ref = Schema->get_table_ref(schema_name);
1917 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1920 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1922 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1925 if(iface->has_multiple_schemas()){
1926 if(schema_id<0){ // invalid schema_id
1927 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1930 vector<string> iface_schemas = iface->get_property("Schemas");
1931 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1932 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1935 // Ensure that in liface, schema_id is used for only one schema
1936 if(schema_of_schemaid.count(liface)==0){
1937 map<int, string> empty_map;
1938 schema_of_schemaid[liface] = empty_map;
1940 if(schema_of_schemaid[liface].count(schema_id)==0){
1941 schema_of_schemaid[liface][schema_id] = schema_name;
1943 if(schema_of_schemaid[liface][schema_id] != schema_name){
1944 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1948 }else{ // single-schema interface
1949 schema_id = -1; // don't generate schema_id predicate
1950 vector<string> iface_schemas = iface->get_property("Schemas");
1951 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1952 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1955 if(iface_schemas.size()>1){
1956 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1961 // If we need to check the schema_id, insert a predicate into the lfta.
1962 // TODO not just schema_id, the full all_schema_ids set.
1964 colref_t *schid_cr = new colref_t("schemaId");
1965 schid_cr->schema_ref = schema_ref;
1966 schid_cr->table_name = rvar_name;
1967 schid_cr->tablevar_ref = 0;
1968 schid_cr->default_table = false;
1969 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1970 data_type *schid_dt = new data_type("uint");
1971 schid_se->dt = schid_dt;
1973 string schid_str = int_to_string(schema_id);
1974 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1975 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1976 lit_se->dt = schid_dt;
1978 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
1979 vector<cnf_elem *> clist;
1980 make_cnf_from_pr(schid_pr, clist);
1981 analyze_cnf(clist[0]);
1982 clist[0]->cost = 1; // cheap one comparison
1983 // cnf built, now insert it.
1984 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
1986 // Specialized processing ... currently filter join
1987 string node_type = split_queries[l]->query_plan[0]->node_type();
1988 if(node_type == "filter_join"){
1989 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
1991 fj->pred_t0.push_back(clist[0]);
1993 fj->pred_t1.push_back(clist[0]);
1995 schemaid_preds.push_back(schid_pr);
1999 // Specialized processing, currently filter join.
2000 if(schemaid_preds.size()>0){
2001 string node_type = split_queries[l]->query_plan[0]->node_type();
2002 if(node_type == "filter_join"){
2003 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2004 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2005 vector<cnf_elem *> clist;
2006 make_cnf_from_pr(filter_pr, clist);
2007 analyze_cnf(clist[0]);
2008 clist[0]->cost = 1; // cheap one comparison
2009 fj->shared_pred.push_back(clist[0]);
2019 // Set the ht size from the recommendation, if there is one in the rec file
2020 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2021 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2025 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2026 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2027 lfta_list.push_back(split_queries[l]);
2028 lfta_mach_lists[lmach].push_back(split_queries[l]);
2030 // THe following is a hack,
2031 // as I should be generating LFTA code through
2032 // the stream_query object.
2034 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2036 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2039 // Create query description to embed in lfta.c
2040 string lfta_schema_str = split_queries[l]->make_schema();
2041 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2043 // get NIC capabilities.
2045 nic_property *nicprop = NULL;
2046 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2047 if(iface_codegen_type.size()){
2048 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2050 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2055 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2058 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2059 query_names.push_back(split_queries[l]->query_name);
2060 mach_query_names[lmach].push_back(query_names.size()-1);
2061 // NOTE: I will assume a 1-1 correspondance between
2062 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2063 // where mach_query_names[lmach][i] contains the index into
2064 // query_names, which names the lfta, and
2065 // mach_query_names[lmach][i] is the stream_query * of the
2066 // corresponding lfta.
2067 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2071 // check if lfta is reusable
2072 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2074 bool lfta_reusable = false;
2075 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2076 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2077 lfta_reusable = true;
2079 lfta_reuse_options.push_back(lfta_reusable);
2081 // LFTA will inherit the liveness timeout specification from the containing query
2082 // it is too conservative as lfta are expected to spend less time per tuple
2085 // extract liveness timeout from query definition
2086 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2087 if (!liveness_timeout) {
2088 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2089 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2090 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2092 lfta_liveness_timeouts.push_back(liveness_timeout);
2094 // Add it to the schema
2095 table_def *td = split_queries[l]->get_output_tabledef();
2096 Schema->append_table(td);
2097 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2102 // If the output is lfta-only, dump out the query name.
2103 if(split_queries.size() == 1 && !hfta_returned){
2104 if(output_query_names ){
2105 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2109 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2114 // output schema summary
2115 if(output_schema_summary){
2116 dump_summary(split_queries[0]);
2122 if(hfta_returned){ // query also has an HFTA component
2123 int hfta_nbr = split_queries.size()-1;
2125 hfta_list.push_back(split_queries[hfta_nbr]);
2127 // report on generated query names
2128 if(output_query_names){
2129 string hfta_name =split_queries[hfta_nbr]->query_name;
2130 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2131 for(l=0;l<hfta_nbr;++l){
2132 string lfta_name =split_queries[l]->query_name;
2133 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2137 // fprintf(stderr,"query names are ");
2138 // for(l=0;l<hfta_nbr;++l){
2139 // if(l>0) fprintf(stderr,",");
2140 // string fta_name =split_queries[l]->query_name;
2141 // fprintf(stderr," %s",fta_name.c_str());
2143 // fprintf(stderr,"\n");
2148 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2149 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2156 //-----------------------------------------------------------------
2157 // Compute and propagate the SE in PROTOCOL fields compute a field.
2158 //-----------------------------------------------------------------
2160 for(i=0;i<lfta_list.size();i++){
2161 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2162 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2164 for(i=0;i<hfta_list.size();i++){
2165 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2166 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2171 //------------------------------------------------------------------------
2172 // Perform individual FTA optimizations
2173 //-----------------------------------------------------------------------
2175 if (partitioned_mode) {
2177 // open partition definition file
2178 string part_fname = config_dir_path + "partition.txt";
2180 FILE* partfd = fopen(part_fname.c_str(), "r");
2182 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2185 PartnParser_setfileinput(partfd);
2186 if (PartnParserparse()) {
2187 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2194 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2196 int num_hfta = hfta_list.size();
2197 for(i=0; i < hfta_list.size(); ++i){
2198 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2201 // Add all new hftas to schema
2202 for(i=num_hfta; i < hfta_list.size(); ++i){
2203 table_def *td = hfta_list[i]->get_output_tabledef();
2204 Schema->append_table(td);
2207 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2211 //------------------------------------------------------------------------
2212 // Do global (cross-fta) optimization
2213 //-----------------------------------------------------------------------
2220 set<string> extra_external_libs;
2222 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2225 // build hfta file name, create output
2226 if(numeric_hfta_flname){
2227 sprintf(tmpstr,"hfta_%d",hfta_count);
2228 hfta_names.push_back(tmpstr);
2229 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2231 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2232 hfta_names.push_back(tmpstr);
2233 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2235 FILE *hfta_fl = fopen(tmpstr,"w");
2236 if(hfta_fl == NULL){
2237 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2240 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2242 // If there is a field verifier, warn about
2243 // lack of compatability
2244 // NOTE : this code assumes that visible non-lfta queries
2245 // are those at the root of a stream query.
2246 string hfta_comment;
2248 string hfta_namespace;
2249 if(hfta_list[i]->defines.count("comment")>0)
2250 hfta_comment = hfta_list[i]->defines["comment"];
2251 if(hfta_list[i]->defines.count("Comment")>0)
2252 hfta_comment = hfta_list[i]->defines["Comment"];
2253 if(hfta_list[i]->defines.count("COMMENT")>0)
2254 hfta_comment = hfta_list[i]->defines["COMMENT"];
2255 if(hfta_list[i]->defines.count("title")>0)
2256 hfta_title = hfta_list[i]->defines["title"];
2257 if(hfta_list[i]->defines.count("Title")>0)
2258 hfta_title = hfta_list[i]->defines["Title"];
2259 if(hfta_list[i]->defines.count("TITLE")>0)
2260 hfta_title = hfta_list[i]->defines["TITLE"];
2261 if(hfta_list[i]->defines.count("namespace")>0)
2262 hfta_namespace = hfta_list[i]->defines["namespace"];
2263 if(hfta_list[i]->defines.count("Namespace")>0)
2264 hfta_namespace = hfta_list[i]->defines["Namespace"];
2265 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2266 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2268 if(field_verifier != NULL){
2270 if(hfta_comment == "")
2271 warning_str += "\tcomment not found.\n";
2273 // Obsolete stuff that Carsten wanted
2274 // if(hfta_title == "")
2275 // warning_str += "\ttitle not found.\n";
2276 // if(hfta_namespace == "")
2277 // warning_str += "\tnamespace not found.\n";
2280 // There is a get_tbl_keys method implemented for qp_nodes,
2281 // integrate it into steam_query, then call it to find keys,
2282 // and annotate feidls with their key-ness.
2283 // If there is a "keys" proprty in the defines block, override anything returned
2284 // from the automated analysis
2286 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2288 for(fi=0;fi<flds.size();fi++){
2289 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2291 if(warning_str != "")
2292 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2293 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2296 // Get the fields in this query
2297 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2299 // do key processing
2300 string hfta_keys_s = "";
2301 if(hfta_list[i]->defines.count("keys")>0)
2302 hfta_keys_s = hfta_list[i]->defines["keys"];
2303 if(hfta_list[i]->defines.count("Keys")>0)
2304 hfta_keys_s = hfta_list[i]->defines["Keys"];
2305 if(hfta_list[i]->defines.count("KEYS")>0)
2306 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2307 string xtra_keys_s = "";
2308 if(hfta_list[i]->defines.count("extra_keys")>0)
2309 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2310 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2311 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2312 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2313 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2315 vector<string> hfta_keys;
2316 vector<string> partial_keys;
2317 vector<string> xtra_keys;
2318 if(hfta_keys_s==""){
2319 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2320 if(xtra_keys_s.size()>0){
2321 xtra_keys = split_string(xtra_keys_s, ',');
2323 for(int xi=0;xi<xtra_keys.size();++xi){
2324 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2325 hfta_keys.push_back(xtra_keys[xi]);
2329 hfta_keys = split_string(hfta_keys_s, ',');
2331 // validate that all of the keys exist in the output.
2332 // (exit on error, as its a bad specificiation)
2333 vector<string> missing_keys;
2334 for(int ki=0;ki<hfta_keys.size(); ++ki){
2336 for(fi=0;fi<flds.size();++fi){
2337 if(hfta_keys[ki] == flds[fi]->get_name())
2341 missing_keys.push_back(hfta_keys[ki]);
2343 if(missing_keys.size()>0){
2344 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2345 for(int hi=0; hi<missing_keys.size(); ++hi){
2346 fprintf(stderr," %s", missing_keys[hi].c_str());
2348 fprintf(stderr,"\n");
2352 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2353 if(hfta_comment != "")
2354 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2355 if(hfta_title != "")
2356 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2357 if(hfta_namespace != "")
2358 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2359 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2360 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2362 // write info about fields to qtree.xml
2364 for(fi=0;fi<flds.size();fi++){
2365 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2366 if(flds[fi]->get_modifier_list()->size()){
2367 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2369 fprintf(qtree_output," />\n");
2372 for(int hi=0;hi<hfta_keys.size();++hi){
2373 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2375 for(int hi=0;hi<partial_keys.size();++hi){
2376 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2378 for(int hi=0;hi<xtra_keys.size();++hi){
2379 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2383 // extract liveness timeout from query definition
2384 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2385 if (!liveness_timeout) {
2386 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2387 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2388 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2390 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2392 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2394 for(itv=0;itv<tmp_tv.size();++itv){
2395 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2397 string ifrs = hfta_list[i]->collect_refd_ifaces();
2399 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2401 fprintf(qtree_output,"\t</HFTA>\n");
2405 // debug only -- do code generation to catch generation-time errors.
2406 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2409 hfta_count++; // for hfta file names with numeric suffixes
2411 hfta_list[i]->get_external_libs(extra_external_libs);
2415 string ext_lib_string;
2416 set<string>::iterator ssi_el;
2417 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2418 ext_lib_string += (*ssi_el)+" ";
2422 // Report on the set of operator views
2423 for(i=0;i<opviews.size();++i){
2424 opview_entry *opve = opviews.get_entry(i);
2425 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2426 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2427 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2428 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2429 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2431 if (!opve->liveness_timeout) {
2432 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2433 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2434 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2436 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2438 for(j=0;j<opve->subq_names.size();j++)
2439 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2440 fprintf(qtree_output,"\t</UDOP>\n");
2444 //-----------------------------------------------------------------
2446 // Create interface-specific meta code files.
2447 // first, open and parse the interface resources file.
2448 ifaces_db = new ifq_t();
2450 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2451 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2452 ifx_fname.c_str(), ierr.c_str());
2456 map<string, vector<stream_query *> >::iterator svsi;
2457 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2458 string lmach = (*svsi).first;
2460 // For this machine, create a set of lftas per interface.
2461 vector<stream_query *> mach_lftas = (*svsi).second;
2462 map<string, vector<stream_query *> > lfta_iface_lists;
2464 for(li=0;li<mach_lftas.size();++li){
2465 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2466 string lfta_iface = tvec[0]->get_interface();
2467 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2470 map<string, vector<stream_query *> >::iterator lsvsi;
2471 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2473 string liface = (*lsvsi).first;
2474 vector<stream_query *> iface_lftas = (*lsvsi).second;
2475 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2476 if(iface_codegen_type.size()){
2477 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2479 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2482 string mcs = generate_nic_code(iface_lftas, nicprop);
2485 mcf_flnm = lmach + "_"+liface+".mcf";
2487 mcf_flnm = hostname + "_"+liface+".mcf";
2489 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2490 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2493 fprintf(mcf_fl,"%s",mcs.c_str());
2495 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2496 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2505 //-----------------------------------------------------------------
2508 // Find common filter predicates in the LFTAs.
2509 // in addition generate structs to store the temporal attributes unpacked by prefilter
2511 map<string, vector<stream_query *> >::iterator ssqi;
2512 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2514 string lmach = (*ssqi).first;
2515 bool packed_return = false;
2519 // The LFTAs of this machine.
2520 vector<stream_query *> mach_lftas = (*ssqi).second;
2521 // break up on a per-interface basis.
2522 map<string, vector<stream_query *> > lfta_iface_lists;
2523 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2525 for(li=0;li<mach_lftas.size();++li){
2526 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2527 string lfta_iface = tvec[0]->get_interface();
2528 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2529 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2533 // Are the return values "packed"?
2534 // This should be done on a per-interface basis.
2535 // But this is defunct code for gs-lite
2536 for(li=0;li<mach_lftas.size();++li){
2537 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2538 string liface = tvec[0]->get_interface();
2539 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2540 if(iface_codegen_type.size()){
2541 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2542 packed_return = true;
2548 // Separate lftas by interface, collect results on a per-interface basis.
2550 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2551 map<string, vector<cnf_set *> > prefilter_preds;
2552 set<unsigned int> pred_ids; // this can be global for all interfaces
2553 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2554 string liface = (*mvsi).first;
2555 vector<cnf_set *> empty_list;
2556 prefilter_preds[liface] = empty_list;
2557 if(! packed_return){
2558 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2561 // get NIC capabilities. (Is this needed?)
2562 nic_property *nicprop = NULL;
2563 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2564 if(iface_codegen_type.size()){
2565 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2567 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2574 // Now that we know the prefilter preds, generate the lfta code.
2575 // Do this for all lftas in this machine.
2576 for(li=0;li<mach_lftas.size();++li){
2577 set<unsigned int> subsumed_preds;
2578 set<unsigned int>::iterator sii;
2580 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2582 if((pid>>16) == li){
2583 subsumed_preds.insert(pid & 0xffff);
2587 string lfta_schema_str = mach_lftas[li]->make_schema();
2588 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2589 nic_property *nicprop = NULL; // no NIC properties?
2590 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2594 // generate structs to store the temporal attributes
2595 // unpacked by prefilter
2596 col_id_set temp_cids;
2597 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2598 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2600 // Compute the lfta bit signatures and the lfta colrefs
2601 // do this on a per-interface basis
2603 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2605 map<string, vector<long long int> > lfta_sigs; // used again later
2606 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2607 string liface = (*mvsi).first;
2608 vector<long long int> empty_list;
2609 lfta_sigs[liface] = empty_list;
2611 vector<col_id_set> lfta_cols;
2612 vector<int> lfta_snap_length;
2613 for(li=0;li<lfta_iface_lists[liface].size();++li){
2614 unsigned long long int mask=0, bpos=1;
2616 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2617 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2621 lfta_sigs[liface].push_back(mask);
2622 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2623 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2626 //for(li=0;li<mach_lftas.size();++li){
2627 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2628 //col_id_set::iterator tcisi;
2629 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2630 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2635 // generate the prefilter
2636 // Do this on a per-interface basis, except for the #define
2638 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2639 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2641 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2646 // Generate interface parameter lookup function
2647 lfta_val[lmach] += "// lookup interface properties by name\n";
2648 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2649 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2650 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2652 // collect a lit of interface names used by queries running on this host
2653 set<std::string> iface_names;
2654 for(i=0;i<mach_query_names[lmach].size();i++){
2655 int mi = mach_query_names[lmach][i];
2656 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2658 if(interface_names[mi]=="")
2659 iface_names.insert("DEFAULTDEV");
2661 iface_names.insert(interface_names[mi]);
2664 // generate interface property lookup code for every interface
2665 set<std::string>::iterator sir;
2666 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2667 if (sir == iface_names.begin())
2668 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2670 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2672 // iterate through interface properties
2673 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2675 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2678 if (iface_properties.empty())
2679 lfta_val[lmach] += "\t\treturn NULL;\n";
2681 for (int i = 0; i < iface_properties.size(); ++i) {
2683 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2685 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2687 // combine all values for the interface property using comma separator
2688 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2689 lfta_val[lmach] += "\t\t\treturn \"";
2690 for (int j = 0; j < vals.size(); ++j) {
2691 lfta_val[lmach] += vals[j];
2692 if (j != vals.size()-1)
2693 lfta_val[lmach] += ",";
2695 lfta_val[lmach] += "\";\n";
2697 lfta_val[lmach] += "\t\t} else\n";
2698 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2701 lfta_val[lmach] += "\t} else\n";
2702 lfta_val[lmach] += "\t\treturn NULL;\n";
2703 lfta_val[lmach] += "}\n\n";
2706 // Generate a full list of FTAs for clearinghouse reference
2707 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2708 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2710 for (i = 0; i < query_names.size(); ++i) {
2712 lfta_val[lmach] += ", ";
2713 lfta_val[lmach] += "\"" + query_names[i] + "\"";
2715 for (i = 0; i < hfta_list.size(); ++i) {
2716 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2718 lfta_val[lmach] += ", NULL};\n\n";
2721 // Add the initialization function to lfta.c
2722 // Change to accept the interface name, and
2723 // set the prefilter function accordingly.
2724 // see the example in demo/err2
2725 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2727 // for(i=0;i<mach_query_names[lmach].size();i++)
2728 // int mi = mach_query_names[lmach][i];
2729 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2731 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2732 string liface = (*mvsi).first;
2733 vector<stream_query *> lfta_list = (*mvsi).second;
2734 for(i=0;i<lfta_list.size();i++){
2735 stream_query *lfta_sq = lfta_list[i];
2736 int mi = lfta_iface_qname_ix[liface][i];
2738 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2740 string this_iface = "DEFAULTDEV";
2741 if(interface_names[mi]!="")
2742 this_iface = '"'+interface_names[mi]+'"';
2743 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2744 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2745 // if(interface_names[mi]=="")
2746 // lfta_val[lmach]+="DEFAULTDEV";
2748 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2749 lfta_val[lmach] += this_iface;
2752 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2753 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2755 sprintf(tmpstr,",%d",snap_lengths[mi]);
2756 lfta_val[lmach] += tmpstr;
2758 // unsigned long long int mask=0, bpos=1;
2760 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2761 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2763 // bpos = bpos << 1;
2767 // sprintf(tmpstr,",%lluull",mask);
2768 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2769 lfta_val[lmach]+=tmpstr;
2771 lfta_val[lmach] += ",0ull";
2774 lfta_val[lmach] += ");\n";
2778 // End of lfta prefilter stuff
2779 // --------------------------------------------------
2781 // If there is a field verifier, warn about
2782 // lack of compatability
2783 string lfta_comment;
2785 string lfta_namespace;
2786 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2787 if(ldefs.count("comment")>0)
2788 lfta_comment = lfta_sq->defines["comment"];
2789 if(ldefs.count("Comment")>0)
2790 lfta_comment = lfta_sq->defines["Comment"];
2791 if(ldefs.count("COMMENT")>0)
2792 lfta_comment = lfta_sq->defines["COMMENT"];
2793 if(ldefs.count("title")>0)
2794 lfta_title = lfta_sq->defines["title"];
2795 if(ldefs.count("Title")>0)
2796 lfta_title = lfta_sq->defines["Title"];
2797 if(ldefs.count("TITLE")>0)
2798 lfta_title = lfta_sq->defines["TITLE"];
2799 if(ldefs.count("NAMESPACE")>0)
2800 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2801 if(ldefs.count("Namespace")>0)
2802 lfta_namespace = lfta_sq->defines["Namespace"];
2803 if(ldefs.count("namespace")>0)
2804 lfta_namespace = lfta_sq->defines["namespace"];
2806 string lfta_ht_size;
2807 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2808 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2809 if(ldefs.count("aggregate_slots")>0){
2810 lfta_ht_size = ldefs["aggregate_slots"];
2813 // NOTE : I'm assuming that visible lftas do not start with _fta.
2814 // -- will fail for non-visible simple selection queries.
2815 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2817 if(lfta_comment == "")
2818 warning_str += "\tcomment not found.\n";
2819 // Obsolete stuff that carsten wanted
2820 // if(lfta_title == "")
2821 // warning_str += "\ttitle not found.\n";
2822 // if(lfta_namespace == "")
2823 // warning_str += "\tnamespace not found.\n";
2825 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2827 for(fi=0;fi<flds.size();fi++){
2828 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2830 if(warning_str != "")
2831 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2832 query_names[mi].c_str(),warning_str.c_str());
2836 // Create qtree output
2837 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2838 if(lfta_comment != "")
2839 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2840 if(lfta_title != "")
2841 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2842 if(lfta_namespace != "")
2843 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2844 if(lfta_ht_size != "")
2845 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2847 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2849 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2850 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2851 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2852 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2853 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2854 // write info about fields to qtree.xml
2855 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2857 for(fi=0;fi<flds.size();fi++){
2858 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2859 if(flds[fi]->get_modifier_list()->size()){
2860 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2862 fprintf(qtree_output," />\n");
2864 fprintf(qtree_output,"\t</LFTA>\n");
2870 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2871 string liface = (*mvsi).first;
2873 " if (!strcmp(device, \""+liface+"\")) \n"
2874 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2878 " if(lfta_prefilter == NULL){\n"
2879 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2886 lfta_val[lmach] += "}\n\n";
2888 if(!(debug_only || hfta_only) ){
2891 lfta_flnm = lmach + "_lfta.c";
2893 lfta_flnm = hostname + "_lfta.c";
2894 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2895 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2898 fprintf(lfta_out,"%s",lfta_header.c_str());
2899 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2900 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2905 // Say what are the operators which must execute
2906 if(opviews.size()>0)
2907 fprintf(stderr,"The queries use the following external operators:\n");
2908 for(i=0;i<opviews.size();++i){
2909 opview_entry *opv = opviews.get_entry(i);
2910 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2914 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2915 machine_names, schema_file_name,
2917 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2920 fprintf(qtree_output,"</QueryNodes>\n");
2925 ////////////////////////////////////////////////////////////
2927 void generate_makefile(vector<string> &input_file_names, int nfiles,
2928 vector<string> &hfta_names, opview_set &opviews,
2929 vector<string> &machine_names,
2930 string schema_file_name,
2931 vector<string> &interface_names,
2932 ifq_t *ifdb, string &config_dir_path,
2935 map<string, vector<int> > &rts_hload
2939 if(config_dir_path != ""){
2940 config_dir_path = "-C "+config_dir_path;
2944 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2945 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2947 // if(libz_exists && !libast_exists){
2948 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2952 // Get set of operator executable files to run
2954 set<string>::iterator ssi;
2955 for(i=0;i<opviews.size();++i){
2956 opview_entry *opv = opviews.get_entry(i);
2957 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2960 FILE *outfl = fopen("Makefile", "w");
2962 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2967 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
2968 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2972 fprintf(outfl," -DLFTA_STATS");
2974 // Gather the set of interfaces
2975 // Also, gather "base interface names" for use in computing
2976 // the hash splitting to virtual interfaces.
2977 // TODO : must update to hanndle machines
2979 set<string> base_vifaces; // base interfaces of virtual interfaces
2980 map<string, string> ifmachines;
2981 map<string, string> ifattrs;
2982 for(i=0;i<interface_names.size();++i){
2983 ifaces.insert(interface_names[i]);
2984 ifmachines[interface_names[i]] = machine_names[i];
2986 size_t Xpos = interface_names[i].find_last_of("X");
2987 if(Xpos!=string::npos){
2988 string iface = interface_names[i].substr(0,Xpos);
2989 base_vifaces.insert(iface);
2991 // get interface attributes and add them to the list
2994 // Do we need to include protobuf libraries?
2995 // TODO Move to the interface library: get the libraries to include
2996 // for an interface type
2998 bool use_proto = false;
3001 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3002 string ifnm = (*ssi);
3003 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3004 for(int ift_i=0;ift_i<ift.size();ift_i++){
3005 if(ift[ift_i]=="PROTO"){
3015 for(i=0;i<hfta_names.size();++i)
3016 fprintf(outfl," %s",hfta_names[i].c_str());
3020 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3021 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3023 fprintf(outfl,"-L. ");
3025 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3027 fprintf(outfl,"-lgscppads -lpads ");
3029 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3031 fprintf(outfl, " -lpz -lz -lbz ");
3032 if(libz_exists && libast_exists)
3033 fprintf(outfl," -last ");
3035 fprintf(outfl, " -ldll -ldl ");
3037 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3038 fprintf(outfl," -lgscpaux");
3040 fprintf(outfl," -fprofile-arcs");
3045 "lfta.o: %s_lfta.c\n"
3046 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3048 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3049 for(i=0;i<nfiles;++i)
3050 fprintf(outfl," %s",input_file_names[i].c_str());
3052 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3054 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3056 for(i=0;i<nfiles;++i)
3057 fprintf(outfl," %s",input_file_names[i].c_str());
3058 fprintf(outfl,"\n");
3060 for(i=0;i<hfta_names.size();++i)
3063 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3066 "\t$(CPP) -o %s.o -c %s.cc\n"
3069 hfta_names[i].c_str(), hfta_names[i].c_str(),
3070 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3071 hfta_names[i].c_str(), hfta_names[i].c_str(),
3072 hfta_names[i].c_str(), hfta_names[i].c_str()
3077 "packet_schema.txt:\n"
3078 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3080 "external_fcns.def:\n"
3081 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3084 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3085 for(i=0;i<hfta_names.size();++i)
3086 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3087 fprintf(outfl,"\n");
3093 // Gather the set of interfaces
3094 // TODO : must update to hanndle machines
3095 // TODO : lookup interface attributes and add them as a parameter to rts process
3096 outfl = fopen("runit", "w");
3098 fprintf(stderr,"Can't open runit for write, exiting.\n");
3106 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3108 "if [ ! -f gshub.log ]\n"
3110 "\techo \"Failed to start bin/gshub.py\"\n"
3113 "ADDR=`cat gshub.log`\n"
3114 "ps opgid= $! >> gs.pids\n"
3115 "./rts $ADDR default ").c_str(), outfl);
3118 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3119 string ifnm = (*ssi);
3120 fprintf(outfl, "%s ",ifnm.c_str());
3121 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3122 for(j=0;j<ifv.size();++j)
3123 fprintf(outfl, "%s ",ifv[j].c_str());
3125 fprintf(outfl, " &\n");
3126 fprintf(outfl, "echo $! >> gs.pids\n");
3127 for(i=0;i<hfta_names.size();++i)
3128 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3130 for(j=0;j<opviews.opview_list.size();++j){
3131 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3135 system("chmod +x runit");
3137 outfl = fopen("stopit", "w");
3139 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3143 fprintf(outfl,"#!/bin/sh\n"
3145 "if [ ! -f gs.pids ]\n"
3149 "for pgid in `cat gs.pids`\n"
3151 "kill -TERM -$pgid\n"
3154 "for pgid in `cat gs.pids`\n"
3161 system("chmod +x stopit");
3163 //-----------------------------------------------
3165 /* For now disable support for virtual interfaces
3166 outfl = fopen("set_vinterface_hash.bat", "w");
3168 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3172 // The format should be determined by an entry in the ifres.xml file,
3173 // but for now hardcode the only example I have.
3174 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3175 if(rts_hload.count((*ssi))){
3176 string iface_name = (*ssi);
3177 string iface_number = "";
3178 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3179 if(isdigit(iface_name[j])){
3180 iface_number = iface_name[j];
3181 if(j>0 && isdigit(iface_name[j-1]))
3182 iface_number = iface_name[j-1] + iface_number;
3186 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3187 vector<int> halloc = rts_hload[iface_name];
3189 for(j=0;j<halloc.size();++j){
3192 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3193 prev_limit = halloc[j];
3195 fprintf(outfl,"\n");
3199 system("chmod +x set_vinterface_hash.bat");
3203 // Code for implementing a local schema
3205 table_list qpSchema;
3207 // Load the schemas of any LFTAs.
3209 for(l=0;l<hfta_nbr;++l){
3210 stream_query *sq0 = split_queries[l];
3211 table_def *td = sq0->get_output_tabledef();
3212 qpSchema.append_table(td);
3214 // load the schemas of any other ref'd tables.
3216 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3218 for(ti=0;ti<input_tbl_names.size();++ti){
3219 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3221 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3223 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3226 qpSchema.append_table(Schema->get_table(tbl_ref));
3231 // Functions related to parsing.
3234 static int split_string(char *instr,char sep, char **words,int max_words){
3240 words[nwords++] = str;
3241 while( (loc = strchr(str,sep)) != NULL){
3244 if(nwords >= max_words){
3245 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3246 nwords = max_words-1;
3248 words[nwords++] = str;